Thanks for all your enthusiastic response. Yes, My target was to try to find the best performance in memory. I got that. On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fhue...@gmail.com> wrote:
> Hi, > > Flink's HashJoin implementation was designed to gracefully handle inputs > that exceed the main memory. > It is not explicitly optimized for in-memory processing and does not play > fancy tricks like optimizing cache accesses or batching. > I assume your benchmark is about in-memory joins only. This was not the > main design goal when the join was implemented but robustness. > Since most of the development of Flink focuses on streaming applications > at the moment, the join implementation has barely been touched in recent > years (except for minor extensions and bugfixes). > > Regarding your tests, Tuple should give better performance than Row > because Row is null-sensitive and serialized a null-mask. > There is also a blog post about Flink's join performance [1] which is > already a bit dusty but as I said, the algorithm hasn't change much since > then. > > Best, Fabian > > [1] > https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html > > > 2017-05-15 16:26 GMT+02:00 weijie tong <tongweijie...@gmail.com>: > >> The Flink version is 1.2.0 >> >> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com> >> wrote: >> >>> @Till thanks for your reply. >>> >>> My code is similar to HashTableITCase.testInMemoryMutableHashTable() >>> . It just use the MutableHashTable class , there's no other Flink's >>> configuration. The main code body is: >>> >>> this.recordBuildSideAccessor = RecordSerializer.get(); >>>> this.recordProbeSideAccessor = RecordSerializer.get(); >>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex}; >>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex}; >>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new >>>> Class[]{BytesValue.class}; >>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, >>>> keyType); >>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, >>>> keyType); >>>> this.pactRecordComparator = new >>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); >>>> Sequence<Record> buildSideRecordsSeq = >>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); >>>> Sequence<Record> probeSideRecordsSeq = >>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); >>>> List<MemorySegment> memorySegments; >>>> int pageSize = hashTableMemoryManager.getTotalNumPages(); >>>> try { >>>> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, >>>> pageSize); >>>> } >>>> catch (MemoryAllocationException e) { >>>> LOGGER.error("could not allocate " + pageSize + " pages memory for >>>> HashJoin", e); >>>> Throwables.propagate(e); >>>> return; >>>> } >>>> try { >>>> Stopwatch stopwatch = Stopwatch.createStarted(); >>>> UniformRecordGenerator buildInput = new >>>> UniformRecordGenerator(buildSideRecordsSeq); >>>> UniformRecordGenerator probeInput = new >>>> UniformRecordGenerator(probeSideRecordsSeq); >>>> join = new MutableHashTable<Record, Record>( >>>> recordBuildSideAccessor, >>>> recordProbeSideAccessor, >>>> recordBuildSideComparator, >>>> recordProbeSideComparator, >>>> pactRecordComparator, >>>> memorySegments, >>>> ioManager >>>> ); >>>> join.open(buildInput,probeInput); >>>> >>>> LOGGER.info("construct hash table elapsed:" + >>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); >>>> >>>> >>> The BytesValue type is self defined one which holds byte[] , but just >>> like the original StringValue, also has the same serDe performance. >>> >>> >>> while (join.nextRecord()) { >>> Record currentProbeRecord = join.getCurrentProbeRecord(); >>> MutableObjectIterator<Record> buildSideIterator = >>> join.getBuildSideIterator(); >>> while (buildSideIterator.next(reusedBuildSideRow) != null) { >>> materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, >>> buildSideIndex2Vector, rowNum); >>> materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, >>> probeSideIndex2Vector, rowNum); >>> rowNum++; >>> }} >>> >>> >>> >>> >>> I have tried both the Record ,Row class as the type of records without >>> any better improved performance . I also tried batched the input records. >>> That means the buildInput or probeInput variables of the first code >>> block which iterate one Record a time from another batched Records . >>> Batched records's content stay in memory in Drill's ValueVector format. >>> Once a record is need to participate in the build or probe phase from a >>> iterate.next() call, >>> it will be fetched from the batched in memory ValueVector content. But >>> no performance gains. >>> >>> >>> The top hotspot profile from Jprofiler is below: >>> > >>> Hot spot,"Self time (microseconds)","Average Time","Invocations" >>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" >>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" >>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" >>> >>> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a" >>> >>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a" >>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" >>> >>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a" >>> >>> >>> My log show that hashjoin.open() method costs too much time. >>> > >>> construct hash table elapsed:1885ms >>> >>> >>> >>> >>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Weijie, >>>> >>>> it might be the case that batching the processing of multiple rows can >>>> give you an improved performance compared to single row processing. >>>> >>>> Maybe you could share the exact benchmark base line results and the >>>> code you use to test Flink's MutableHashTable with us. Also the Flink >>>> configuration and how you run it would be of interest. That way we might be >>>> able to see if we can tune Flink a bit more. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com> >>>> wrote: >>>> >>>>> I has a test case to use Flink's MutableHashTable class to do a hash >>>>> join on a local machine with 64g memory, 64cores. The test case is one >>>>> build table with 14w rows ,one probe table with 320w rows ,the matched >>>>> result rows is 12 w. >>>>> >>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I >>>>> ensure there's no overflow, the smaller table is the build side. The >>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of >>>>> several >>>>> fields which are byte[]. Through my log,I find the open() method takes >>>>> 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's >>>>> profile shows the MutableObjectIterator's next() method call is the >>>>> hotspot. >>>>> >>>>> >>>>> I want to know how to tune this scenario. I find Drill's HashJoin is >>>>> batch model. Its build side's input is a RecordBatch which holds batch of >>>>> rows and memory size is approach to L2 cache. Through this strategy it >>>>> will >>>>> gain less method calls (that means call to next() ) and much efficient to >>>>> cpu calculation. I also find SQL server's paper noticed the batch model's >>>>> performance gains ( >>>>> https://www.microsoft.com/en-us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) >>>>> . I guess the performance's down is due to the single row iterate >>>>> model. >>>>> >>>>> >>>>> Hope someone to correct my opinion. Also maybe I have a wrong use of >>>>> the MutableHashTable. wait for someone to give an advice. >>>>> >>>> >>>> >>> >> >