[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158521#comment-15158521 ] ASF GitHub Bot commented on FLINK-3385: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1680 > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158005#comment-15158005 ] ASF GitHub Bot commented on FLINK-3385: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1680#issuecomment-187448014 Thanks for the fix! Will merge this. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157252#comment-15157252 ] ASF GitHub Bot commented on FLINK-3385: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1680#issuecomment-187258482 Unprobed, spilled partitions are now retained when performing a build-side outer join (and still discarded in all other cases). When these unprobed, spilled partitions are loaded by `MutableHashTable.prepareNextPartition` we simply create an iterator over the spilled segments which are read by the appropriate HashJoinIterator in a single loop. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157232#comment-15157232 ] ASF GitHub Bot commented on FLINK-3385: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1680#issuecomment-187253798 Could add description on how is the fix being done and proposed result. Thanks > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15156041#comment-15156041 ] Greg Hogan commented on FLINK-3385: --- Pull request is [here|https://github.com/apache/flink/pull/1680]. Had to rename the pull request, looks like if there are multiple commits the default name is the branch name rather than the commit log. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154905#comment-15154905 ] Greg Hogan commented on FLINK-3385: --- Will submit the pull request when Travis has finished. There was an additional fix, discovered with the unit tests running under severe memory constraints, where {{processUnmatchedBuildIter}} would cause a {{NullPointerException}} when all partitions had spilled to disk. That simple fix is included as a hotfix. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154530#comment-15154530 ] Fabian Hueske commented on FLINK-3385: -- Great, thanks a lot! > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154512#comment-15154512 ] Greg Hogan commented on FLINK-3385: --- Yes, I literally have my test passing as of 5 minutes ago and was working on tidying it up. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154506#comment-15154506 ] Fabian Hueske commented on FLINK-3385: -- Hi [~greghogan], have you started to work on this issue? We are trying to close the blocking issues for the 1.0 release. Thanks, Fabian > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141490#comment-15141490 ] Greg Hogan commented on FLINK-3385: --- Here is the simple function I was using to isolate this case, and constraining available memory with -Xmx200m. What is the best way to force spilling in an automated test? {code} public void testBrokenOuterJoin() throws Exception { final int NUMBER_OF_ELEMENTS = 2 * 1000 * 1000; final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataSet ds1 = env .fromParallelCollection(new NumberSequenceIterator(0, NUMBER_OF_ELEMENTS - 1), Long.class); DataSet ds2 = ds1 .filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return false; } }); DataSet joinDs = ds1 .leftOuterJoin(ds2, JoinHint.REPARTITION_HASH_FIRST) .where("*") .equalTo("*") .with(new JoinFunction() { @Override public Long join(Long first, Long second) throws Exception { return (first == null) ? second : first; } }); // List result = joinDs // .collect(); // Collections.sort(result); long count = joinDs .count(); assertEquals(NUMBER_OF_ELEMENTS, count); } {code} > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Priority: Critical > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141681#comment-15141681 ] Greg Hogan commented on FLINK-3385: --- Yes, I will look into this some more. > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141551#comment-15141551 ] Fabian Hueske commented on FLINK-3385: -- Thanks [~greghogan]! This is obviously a serious issue with the build-side outer join. The {{HashTableITCase}} has a few tests that enforce spilling to disk by providing only little memory to the hash table. It should be easy to port your code in to that test class. This bug should be fixed for the upcoming 1.0.0 release. Do you want to do it? > Fix outer join skipping unprobed partitions > --- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Greg Hogan >Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)