@Till thanks for your reply. My code is similar to HashTableITCase.testInMemoryMutableHashTable() . It just use the MutableHashTable class , there's no other Flink's configuration. The main code body is:
this.recordBuildSideAccessor = RecordSerializer.get(); > this.recordProbeSideAccessor = RecordSerializer.get(); > final int[] buildKeyPos = new int[]{buildSideJoinIndex}; > final int[] probeKeyPos = new int[]{probeSideJoinIndex}; > final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new > Class[]{BytesValue.class}; > this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType); > this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType); > this.pactRecordComparator = new > HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); > Sequence<Record> buildSideRecordsSeq = > makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); > Sequence<Record> probeSideRecordsSeq = > makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); > List<MemorySegment> memorySegments; > int pageSize = hashTableMemoryManager.getTotalNumPages(); > try { > memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, > pageSize); > } > catch (MemoryAllocationException e) { > LOGGER.error("could not allocate " + pageSize + " pages memory for > HashJoin", e); > Throwables.propagate(e); > return; > } > try { > Stopwatch stopwatch = Stopwatch.createStarted(); > UniformRecordGenerator buildInput = new > UniformRecordGenerator(buildSideRecordsSeq); > UniformRecordGenerator probeInput = new > UniformRecordGenerator(probeSideRecordsSeq); > join = new MutableHashTable<Record, Record>( > recordBuildSideAccessor, > recordProbeSideAccessor, > recordBuildSideComparator, > recordProbeSideComparator, > pactRecordComparator, > memorySegments, > ioManager > ); > join.open(buildInput,probeInput); > > LOGGER.info("construct hash table elapsed:" + > stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); > > The BytesValue type is self defined one which holds byte[] , but just like the original StringValue, also has the same serDe performance. while (join.nextRecord()) { Record currentProbeRecord = join.getCurrentProbeRecord(); MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator(); while (buildSideIterator.next(reusedBuildSideRow) != null) { materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum); materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum); rowNum++; }} I have tried both the Record ,Row class as the type of records without any better improved performance . I also tried batched the input records. That means the buildInput or probeInput variables of the first code block which iterate one Record a time from another batched Records . Batched records's content stay in memory in Drill's ValueVector format. Once a record is need to participate in the build or probe phase from a iterate.next() call, it will be fetched from the batched in memory ValueVector content. But no performance gains. The top hotspot profile from Jprofiler is below: > Hot spot,"Self time (microseconds)","Average Time","Invocations" org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a" org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a" org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a" My log show that hashjoin.open() method costs too much time. > construct hash table elapsed:1885ms On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Weijie, > > it might be the case that batching the processing of multiple rows can > give you an improved performance compared to single row processing. > > Maybe you could share the exact benchmark base line results and the code > you use to test Flink's MutableHashTable with us. Also the Flink > configuration and how you run it would be of interest. That way we might be > able to see if we can tune Flink a bit more. > > Cheers, > Till > > On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com> > wrote: > >> I has a test case to use Flink's MutableHashTable class to do a hash join >> on a local machine with 64g memory, 64cores. The test case is one build >> table with 14w rows ,one probe table with 320w rows ,the matched result >> rows is 12 w. >> >> It takes 2.2 seconds to complete the join.The performance seems bad. I >> ensure there's no overflow, the smaller table is the build side. The >> MutableObjectIterator is a sequence of Rows. The Row is composed of several >> fields which are byte[]. Through my log,I find the open() method takes >> 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's >> profile shows the MutableObjectIterator's next() method call is the >> hotspot. >> >> >> I want to know how to tune this scenario. I find Drill's HashJoin is >> batch model. Its build side's input is a RecordBatch which holds batch of >> rows and memory size is approach to L2 cache. Through this strategy it will >> gain less method calls (that means call to next() ) and much efficient to >> cpu calculation. I also find SQL server's paper noticed the batch model's >> performance gains (https://www.microsoft.com/en- >> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) . >> I guess the performance's down is due to the single row iterate model. >> >> >> Hope someone to correct my opinion. Also maybe I have a wrong use of the >> MutableHashTable. wait for someone to give an advice. >> > >