abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1375
Change subject: Add Linear Hashing Experiment ...................................................................... Add Linear Hashing Experiment Change-Id: Iffc251dd5f8825de6543d8118bf9fef76282a1d2 --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java M asterixdb/asterix-tools/pom.xml A asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionRunner.java A asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionSimulator.java A asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/FieldLinearHashPartitionComputerFactory.java A asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/SingleTupleFrameAcessor.java 6 files changed, 400 insertions(+), 1 deletion(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/75/1375/1 diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java index 7c6556b..1e1dbe2 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java @@ -132,7 +132,7 @@ } @Override - public Integer next() throws IOException { + public Integer next() { generate(); return value; } @@ -145,6 +145,13 @@ out.writeInt(value); } + public void write(int value, DataOutput out) throws IOException { + if (tagged) { + out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG); + } + out.writeInt(value); + } + @Override public Integer get() throws IOException { return value; diff --git a/asterixdb/asterix-tools/pom.xml b/asterixdb/asterix-tools/pom.xml index c039c48..08988d0 100644 --- a/asterixdb/asterix-tools/pom.xml +++ b/asterixdb/asterix-tools/pom.xml @@ -69,6 +69,12 @@ <dependencies> <dependency> <groupId>org.apache.asterix</groupId> + <artifactId>asterix-app</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.asterix</groupId> <artifactId>asterix-lang-aql</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionRunner.java b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionRunner.java new file mode 100644 index 0000000..c746637 --- /dev/null +++ b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionRunner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.tools.distribution; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.CRC32; + +import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; + +/** + * Entry point for the distribution simulation + */ +public class DistributionRunner { + // 10 millions + private static final int NUM_RECORDS = 10000000; + // Variable Number of Storage Partitions + private static final int[] NUM_PARTITIONS = { 2, 3, 4, 5, 6, 8, 10, 16, 25, 32, 40, 50, 64, 70, 80, 90, 100, 120, + 128, 150, 200, 256, 300, 400, 512, 600, 700, 800, 900, 1024, 1050, 1200, 1400, 1600, 1800, 2000, 2048 }; + + private DistributionRunner() { + } + + public static void main(String[] args) throws IOException { + Pair<String, ITuplePartitionComputer>[] partitioners = partitioners(); + DistributionSimulator simulator = new DistributionSimulator(NUM_RECORDS, NUM_PARTITIONS, partitioners); + PrintStream os = new PrintStream(new File("target" + File.separator + "distribution.txt")); + simulator.simulate(os); + } + + @SuppressWarnings("unchecked") + private static Pair<String, ITuplePartitionComputer>[] partitioners() { + List<Pair<String, ITuplePartitionComputer>> partitioners = new ArrayList<>(); + /* + * Hash MOD P with MurMur + */ + FieldHashPartitionComputerFactory partiionerFactory = new FieldHashPartitionComputerFactory(new int[] { 0 }, + new IBinaryHashFunctionFactory[] { BinaryHashFunctionFactoryProvider.INTEGER_POINTABLE_INSTANCE }); + partitioners.add(new Pair<>("MurMur Hash MOD P", partiionerFactory.createPartitioner())); + /* + * Hash MOD P with CRC32 + */ + final CRC32 crc32 = new CRC32(); + ITuplePartitionComputer crc32Partitioner = (IFrameTupleAccessor accessor, int tIndex, int nParts) -> { + return crc32(crc32, accessor.getBuffer().array(), accessor.getFieldStartOffset(0, 0) + 1, accessor + .getFieldLength(0, 0) - 1) % nParts; + }; + partitioners.add(new Pair<>("CRC32 Hash MOD P", crc32Partitioner)); + /* + * MurMur with Linear Hashing + */ + FieldLinearHashPartitionComputerFactory linearPartitionerFactory = new FieldLinearHashPartitionComputerFactory( + new int[] { 0 }, + new IBinaryHashFunctionFactory[] { BinaryHashFunctionFactoryProvider.INTEGER_POINTABLE_INSTANCE }); + partitioners.add(new Pair<>("MurMur with Linear Hashing", linearPartitionerFactory.createPartitioner())); + /* + * CRC32 with Linear Hashing + */ + ITuplePartitionComputer cpLinearHashing = (IFrameTupleAccessor accessor, int tIndex, int nParts) -> { + int partition = 0; + int h = crc32(crc32, accessor.getBuffer().array(), accessor.getFieldStartOffset(0, 0) + 1, accessor + .getFieldLength( + 0, 0) - 1); + int level = (int) Math.floor(Math.log(nParts) / Math.log(2)); + int base = (int) Math.pow(2, level); + int next = nParts - base; + partition = h % base; + if (partition < next) { + int nextBase = (int) Math.pow(2, level + 1.0); + partition = h % nextBase; + } + return partition; + }; + partitioners.add(new Pair<>("CRC32 with Linear Hashing", cpLinearHashing)); + return partitioners.toArray(new Pair[partitioners.size()]); + } + + private static int crc32(CRC32 crc32, byte[] key, int offset, int length) { + crc32.reset(); + crc32.update(key, offset, length); + long rv = (crc32.getValue() >> 16) & 0x7fff; + return (int) rv; + } +} diff --git a/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionSimulator.java b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionSimulator.java new file mode 100644 index 0000000..252aae6 --- /dev/null +++ b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/DistributionSimulator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.tools.distribution; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.asterix.app.data.gen.AInt32FieldValueGenerator; +import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.test.common.TestTupleReference; +import org.apache.commons.math.stat.descriptive.SummaryStatistics; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; + +/** + * A class that simulate the load balancing effect of a distribution function on a clauster of different partition sizes + * It tests the function with linear hashing expansion and with uniform distribution + */ +public class DistributionSimulator { + private final int numRecords; + private final int[] numPartitions; + private final Pair<String, ITuplePartitionComputer>[] tuplePartitionComputers; + private final AInt32FieldValueGenerator keyGenerator; + private final HashSet<Integer> keys; + private final TestTupleReference tuple; + private final SingleTupleFrameAcessor accessor; + + public DistributionSimulator(int numRecords, int[] numPartitions, + Pair<String, ITuplePartitionComputer>[] tuplePartitionComputers) { + this.numRecords = numRecords; + this.numPartitions = numPartitions; + this.tuplePartitionComputers = tuplePartitionComputers; + keyGenerator = new AInt32FieldValueGenerator(GenerationFunction.RANDOM, false, true); + keys = new HashSet<>(); + tuple = new TestTupleReference(1); + accessor = new SingleTupleFrameAcessor(tuple, 5); + } + + public void simulate(PrintStream out) throws IOException { + for (int i = 0; i < numRecords; i++) { + Integer key = keyGenerator.next(); + while (keys.contains(key)) { + key = keyGenerator.next(); + } + keys.add(key); + } + for (int p : numPartitions) { + int[][] partitions = new int[tuplePartitionComputers.length][p]; + for (int key : keys) { + tuple.reset(); + keyGenerator.write(key, tuple.getFields()[0].getDataOutput()); + accessor.refresh(); + for (int j = 0; j < tuplePartitionComputers.length; j++) { + int partition = tuplePartitionComputers[j].second.partition(accessor, 0, p); + partitions[j][partition]++; + } + } + print(out, partitions); + } + } + + private void print(PrintStream out, int[][] partitions) { + out.println("================================================================="); + out.println("Number of records: " + numRecords); + out.println("Number of partitions: " + partitions[0].length); + for (int i = 0; i < partitions.length; i++) { + out.println((i + 1) + ". " + tuplePartitionComputers[i].first); + SummaryStatistics stats = stats(partitions[i]); + out.println(stats); + out.println("Sorted Distribution"); + Arrays.sort(partitions[i]); + for (int j = partitions[i].length - 1; j >= 0; j--) { + out.print(partitions[i][j]); + if (j > 0) { + out.print(", "); + } + } + out.println(); + if (i + 1 < partitions.length) { + out.println("####################"); + out.println("####################"); + } + } + out.println("================================================================="); + } + + private SummaryStatistics stats(int[] values) { + SummaryStatistics stats = new SummaryStatistics(); + for (int i : values) { + stats.addValue(i); + } + return stats; + } +} diff --git a/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/FieldLinearHashPartitionComputerFactory.java b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/FieldLinearHashPartitionComputerFactory.java new file mode 100644 index 0000000..6994fa3 --- /dev/null +++ b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/FieldLinearHashPartitionComputerFactory.java @@ -0,0 +1,60 @@ +package org.apache.asterix.tools.distribution; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class FieldLinearHashPartitionComputerFactory implements ITuplePartitionComputerFactory { + private static final long serialVersionUID = 1L; + private final int[] hashFields; + private final IBinaryHashFunctionFactory[] hashFunctionFactories; + + public FieldLinearHashPartitionComputerFactory(int[] hashFields, + IBinaryHashFunctionFactory[] hashFunctionFactories) { + this.hashFields = hashFields; + this.hashFunctionFactories = hashFunctionFactories; + } + + @Override + public ITuplePartitionComputer createPartitioner() { + final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length]; + for (int i = 0; i < hashFunctionFactories.length; ++i) { + hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction(); + } + return new ITuplePartitionComputer() { + @Override + public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { + if (nParts == 1) { + return 0; + } + int h = 0; + int startOffset = accessor.getTupleStartOffset(tIndex); + int slotLength = accessor.getFieldSlotsLength(); + for (int j = 0; j < hashFields.length; ++j) { + int fIdx = hashFields[j]; + IBinaryHashFunction hashFn = hashFunctions[j]; + int fStart = accessor.getFieldStartOffset(tIndex, fIdx); + int fEnd = accessor.getFieldEndOffset(tIndex, fIdx); + int fh = hashFn + .hash(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd - fStart); + h = h * 31 + fh; + } + if (h < 0) { + h = -(h + 1); + } + int level = (int) Math.floor(Math.log(nParts) / Math.log(2)); + int base = (int) Math.pow(2, level); + int next = nParts - base; + int partition = h % base; + if (partition < next) { + int nextBase = (int) Math.pow(2, level + 1.0); + partition = h % nextBase; + } + return partition; + } + }; + } +} diff --git a/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/SingleTupleFrameAcessor.java b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/SingleTupleFrameAcessor.java new file mode 100644 index 0000000..320f137 --- /dev/null +++ b/asterixdb/asterix-tools/src/main/java/org/apache/asterix/tools/distribution/SingleTupleFrameAcessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.tools.distribution; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class SingleTupleFrameAcessor implements IFrameTupleAccessor { + + private final ITupleReference tuple; + private final ByteBuffer buffer; + + public SingleTupleFrameAcessor(ITupleReference tuple, int maxSize) { + this.tuple = tuple; + buffer = ByteBuffer.allocate(maxSize); + + } + + @Override + public int getFieldCount() { + return tuple.getFieldCount(); + } + + @Override + public int getFieldSlotsLength() { + return 0; + } + + @Override + public int getFieldEndOffset(int tupleIndex, int fIdx) { + return tuple.getFieldStart(fIdx) + tuple.getFieldLength(fIdx); + } + + @Override + public int getFieldStartOffset(int tupleIndex, int fIdx) { + return tuple.getFieldStart(fIdx); + } + + public void refresh() { + buffer.clear(); + int destPos = 0; + for (int i = 0; i < tuple.getFieldCount(); i++) { + System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), buffer.array(), destPos, tuple + .getFieldLength(i)); + destPos += tuple.getFieldLength(i); + } + } + + @Override + public int getFieldLength(int tupleIndex, int fIdx) { + return tuple.getFieldLength(fIdx); + } + + @Override + public int getTupleLength(int tupleIndex) { + return -1; + } + + @Override + public int getTupleEndOffset(int tupleIndex) { + return -1; + } + + @Override + public int getTupleStartOffset(int tupleIndex) { + return 0; + } + + @Override + public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) { + return 0; + } + + @Override + public int getTupleCount() { + return 1; + } + + @Override + public ByteBuffer getBuffer() { + return buffer; + } + + @Override + public void reset(ByteBuffer buffer) { + // Do nothing! + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1375 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iffc251dd5f8825de6543d8118bf9fef76282a1d2 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>