http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java deleted file mode 100644 index ff2e40b..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.common.data.partition.range; - -public interface IRangeMap { - - public int getSplitCount(); - - public byte[] getByteArray(int columnIndex, int splitIndex); - - public int getStartOffset(int columnIndex, int splitIndex); - - public int getLength(int columnIndex, int splitIndex); - - public int getTag(int columnIndex, int splitIndex); - - // Min value functions - public byte[] getMinByteArray(int columnIndex); - - public int getMinStartOffset(int columnIndex); - - public int getMinLength(int columnIndex); - - public int getMinTag(int columnIndex); - - // Max value functions - public byte[] getMaxByteArray(int columnIndex); - - public int getMaxStartOffset(int columnIndex); - - public int getMaxLength(int columnIndex); - - public int getMaxTag(int columnIndex); -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java deleted file mode 100644 index dcde70b..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.common.data.partition.range; - -public interface IRangePartitionType { - public enum RangePartitioningType { - /** - * Partitioning is determined by finding the range partition where the first data point lies. - */ - PROJECT, - /** - * Partitioning is determined by finding the range partition where the last data point lies. - */ - PROJECT_END, - /** - * Partitioning is determined by finding all the range partitions where the data has a point. - */ - SPLIT, - /** - * Partitioning is determined by finding all the range partitions where the data has a point - * or comes after the data point. - */ - REPLICATE - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java index 00fb86d..c15d39a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java @@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.common.data.partition.range; import java.io.Serializable; +import org.apache.hyracks.api.dataflow.value.IRangeMap; + /** * The range map stores the field split values in an byte array. * The first and last split values for each column represent the min and max values (not actually split values). http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java new file mode 100644 index 0000000..befaad9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.base; + +import java.io.Serializable; + +/** + * Represents a range id in a logical plan. + */ +public final class RangeId implements Serializable { + private static final long serialVersionUID = 1L; + private final int id; + + public RangeId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public String toString() { + return "RangeId(#" + id + ")"; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RangeId)) { + return false; + } else { + return id == ((RangeId) obj).getId(); + } + } + + @Override + public int hashCode() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java index 068d11a..520ddd9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java @@ -34,24 +34,28 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager; import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager; import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; import org.apache.hyracks.dataflow.std.collectors.SortMergeFrameReader; +import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor { private static final long serialVersionUID = 1L; private final ITupleRangePartitionComputerFactory tprcf; + private final RangeId rangeId; private final int[] sortFields; private final IBinaryComparatorFactory[] comparatorFactories; private final INormalizedKeyComputerFactory nkcFactory; public MToNRangePartitionMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, - ITupleRangePartitionComputerFactory tprcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, - INormalizedKeyComputerFactory nkcFactory) { + ITupleRangePartitionComputerFactory tprcf, RangeId rangeId, int[] sortFields, + IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) { super(spec); this.tprcf = tprcf; + this.rangeId = rangeId; this.sortFields = sortFields; this.comparatorFactories = comparatorFactories; this.nkcFactory = nkcFactory; @@ -61,9 +65,7 @@ public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNCo public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, - recordDesc, tprcf.createPartitioner()); - return rangeWriter; + return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tprcf, rangeId); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java index 2338993..6c83abb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager; import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager; import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; @@ -43,15 +44,17 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec private static final long serialVersionUID = 1L; private final ITupleRangePartitionComputerFactory trpcf; + private final RangeId rangeId; private final int[] sortFields; private final IBinaryComparatorFactory[] comparatorFactories; private final INormalizedKeyComputerFactory nkcFactory; public MToNRangePartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, - ITupleRangePartitionComputerFactory trpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, - INormalizedKeyComputerFactory nkcFactory) { + ITupleRangePartitionComputerFactory trpcf, RangeId rangeId, int[] sortFields, + IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) { super(spec); this.trpcf = trpcf; + this.rangeId = rangeId; this.sortFields = sortFields; this.comparatorFactories = comparatorFactories; this.nkcFactory = nkcFactory; @@ -61,9 +64,7 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, - recordDesc, trpcf.createPartitioner()); - return rangeWriter; + return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, trpcf, rangeId); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java index f8240b8..2740a60 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java @@ -23,25 +23,38 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.IPartitionWriterFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer; +import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.storage.IGrowableIntArray; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.base.RangeId; +import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; import org.apache.hyracks.storage.common.arraylist.IntArrayList; public class PartitionRangeDataWriter extends AbstractPartitionDataWriter { - private final ITupleRangePartitionComputer tpc; + private final ITupleRangePartitionComputerFactory trpcf; + private final RangeId rangeId; private final IGrowableIntArray map; + private ITupleRangePartitionComputer tpc; public PartitionRangeDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, - IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITupleRangePartitionComputer tpc) - throws HyracksDataException { + IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, + ITupleRangePartitionComputerFactory trpcf, RangeId rangeId) throws HyracksDataException { super(ctx, consumerPartitionCount, pwFactory, recordDescriptor); - this.tpc = tpc; + this.trpcf = trpcf; + this.rangeId = rangeId; this.map = new IntArrayList(8, 8); } @Override + public void open() throws HyracksDataException { + super.open(); + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId); + tpc = trpcf.createPartitioner(rangeState.getRangeMap()); + } + + @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { if (!allocatedFrame) { allocateFrames(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java index 1087cf5..850bf56 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java @@ -20,12 +20,13 @@ package org.apache.hyracks.dataflow.std.join; import java.io.Serializable; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; public interface IMergeJoinCheckerFactory extends Serializable { - IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) throws HyracksDataException; + IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) throws HyracksDataException; RangePartitioningType getLeftPartitioningType(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java index 649247e..6f0b33b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java @@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { locks.setPartitions(nPartitions); RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, - partition); + partition, null); return new RightDataOperator(ctx, partition, inRecordDesc, mjc); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java index 7ca2542..15df580 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java @@ -20,8 +20,9 @@ package org.apache.hyracks.dataflow.std.join; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @@ -32,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory } @Override - public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) { + public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java index 15d91be..067246d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java @@ -19,30 +19,23 @@ package org.apache.hyracks.dataflow.std.misc; import java.nio.ByteBuffer; -import java.util.List; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractStateObject; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -import org.apache.hyracks.dataflow.std.sort.ISorter; +import org.apache.hyracks.dataflow.std.base.RangeId; public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -50,11 +43,15 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { private static final int RANGE_FORWARD_ACTIVITY_ID = 0; private static final int RANGE_WRITER_ACTIVITY_ID = 1; + private final RangeId rangeId; private final IRangeMap rangeMap; - public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, IRangeMap rangeMap) { + public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, RangeId rangeId, IRangeMap rangeMap, + RecordDescriptor recordDescriptor) { super(spec, 1, 1); + this.rangeId = rangeId; this.rangeMap = rangeMap; + recordDescriptors[0] = recordDescriptor; } @Override @@ -68,8 +65,8 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { public static class RangeForwardTaskState extends AbstractStateObject { private IRangeMap rangeMap; - public RangeForwardTaskState(JobId jobId, TaskId taskId, IRangeMap rangeMap) { - super(jobId, taskId); + public RangeForwardTaskState(JobId jobId, RangeId rangeId, IRangeMap rangeMap) { + super(jobId, rangeId); this.rangeMap = rangeMap; } @@ -93,8 +90,7 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void open() throws HyracksDataException { - state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), - new TaskId(getActivityId(), partition), rangeMap); + state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap); ctx.setStateObject(state); writer.open(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java index c3adc20..273d5ba 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java @@ -19,9 +19,6 @@ package org.apache.hyracks.dataflow.std.sort; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.logging.Level; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java index 2cf166b..637e195 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java @@ -32,10 +32,12 @@ import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.storage.IGrowableIntArray; import org.apache.hyracks.data.std.accessors.PointableBinaryRangeAscComparatorFactory; @@ -48,8 +50,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer; import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; import org.apache.hyracks.storage.common.arraylist.IntArrayList; import org.apache.hyracks.test.support.TestUtils; @@ -143,8 +143,8 @@ public class FieldRangePartitionComputerFactoryTest extends TestCase { IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE); int[] rangeFields = new int[] { 0 }; ITupleRangePartitionComputerFactory frpcf = new FieldRangePartitionComputerFactory(rangeFields, - comparatorFactories, rangeMap, rangeType); - ITupleRangePartitionComputer partitioner = frpcf.createPartitioner(); + comparatorFactories, rangeType); + ITupleRangePartitionComputer partitioner = frpcf.createPartitioner(rangeMap); IFrameTupleAccessor accessor = new FrameTupleAccessor(RecordDesc); ByteBuffer buffer = prepareData(ctx, integers);