@Till thanks for your reply.

My code is similar to   HashTableITCase.testInMemoryMutableHashTable()   .
It just use the MutableHashTable class , there's  no other Flink's
configuration.  The main code body is:

this.recordBuildSideAccessor = RecordSerializer.get();
> this.recordProbeSideAccessor = RecordSerializer.get();
> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new 
> Class[]{BytesValue.class};
> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
> this.pactRecordComparator = new 
> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
> Sequence<Record> buildSideRecordsSeq = 
> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
> Sequence<Record> probeSideRecordsSeq = 
> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
> List<MemorySegment> memorySegments;
> int pageSize = hashTableMemoryManager.getTotalNumPages();
> try {
>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, 
> pageSize);
> }
> catch (MemoryAllocationException e) {
>   LOGGER.error("could not allocate " + pageSize + " pages memory for 
> HashJoin", e);
>   Throwables.propagate(e);
>   return;
> }
> try {
>   Stopwatch stopwatch = Stopwatch.createStarted();
>   UniformRecordGenerator buildInput = new 
> UniformRecordGenerator(buildSideRecordsSeq);
>   UniformRecordGenerator probeInput = new 
> UniformRecordGenerator(probeSideRecordsSeq);
>   join = new MutableHashTable<Record, Record>(
>       recordBuildSideAccessor,
>       recordProbeSideAccessor,
>       recordBuildSideComparator,
>       recordProbeSideComparator,
>       pactRecordComparator,
>       memorySegments,
>       ioManager
>   );
>   join.open(buildInput,probeInput);
>
>   LOGGER.info("construct hash table elapsed:" + 
> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>
>
The BytesValue type is self defined one which holds byte[] , but just like
the original StringValue, also has the same serDe performance.


while (join.nextRecord()) {
  Record currentProbeRecord = join.getCurrentProbeRecord();
  MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
  while (buildSideIterator.next(reusedBuildSideRow) != null) {
    materializeRecord2OutVector(reusedBuildSideRow,
buildSideIndex2Value, buildSideIndex2Vector, rowNum);
    materializeRecord2OutVector(currentProbeRecord,
probeSideIndex2Value, probeSideIndex2Vector, rowNum);
    rowNum++;
  }}




I have tried both the Record ,Row class as the type of records without any
better improved performance . I also tried batched the input records. That
means the  buildInput or probeInput variables of the first code block which
iterate one Record a time from another batched Records . Batched records's
content stay in memory in Drill's ValueVector format. Once a record is need
to participate in the build or probe phase from a iterate.next() call,
it will be fetched from the batched in memory ValueVector content. But no
performance gains.


The top hotspot profile from Jprofiler is below:
>
Hot spot,"Self time (microseconds)","Average Time","Invocations"
org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"


My log show that hashjoin.open()  method costs too much time.
>
construct hash table elapsed:1885ms




On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Weijie,
>
> it might be the case that batching the processing of multiple rows can
> give you an improved performance compared to single row processing.
>
> Maybe you could share the exact benchmark base line results and the code
> you use to test Flink's MutableHashTable with us. Also the Flink
> configuration and how you run it would be of interest. That way we might be
> able to see if we can tune Flink a bit more.
>
> Cheers,
> Till
>
> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com>
> wrote:
>
>> I has a test case to use Flink's MutableHashTable class to do a hash join
>> on a local machine with 64g memory, 64cores. The test case is one build
>> table with 14w rows ,one probe table with 320w rows ,the matched result
>> rows is 12 w.
>>
>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>> ensure there's no overflow, the smaller table is the build side. The
>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>> fields which are byte[]. Through my log,I find the open() method takes
>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>> profile shows the MutableObjectIterator's next() method call is the
>> hotspot.
>>
>>
>> I want to know how to tune this scenario. I find Drill's HashJoin is
>> batch model. Its build side's input is a RecordBatch which holds batch of
>> rows and memory size is approach to L2 cache. Through this strategy it will
>> gain less method calls (that means call to next() ) and much efficient to
>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>> performance gains (https://www.microsoft.com/en-
>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)  .
>>   I guess the performance's down is due to the single row iterate model.
>>
>>
>> Hope someone to correct my opinion. Also maybe I have a wrong use  of the
>> MutableHashTable. wait for someone to give an advice.
>>
>
>

Reply via email to