[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158521#comment-15158521
 ] 

ASF GitHub Bot commented on FLINK-3385:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1680


> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158005#comment-15158005
 ] 

ASF GitHub Bot commented on FLINK-3385:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1680#issuecomment-187448014
  
Thanks for the fix! Will merge this.


> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157252#comment-15157252
 ] 

ASF GitHub Bot commented on FLINK-3385:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1680#issuecomment-187258482
  
Unprobed, spilled partitions are now retained when performing a build-side 
outer join (and still discarded in all other cases).

When these unprobed, spilled partitions are loaded by 
`MutableHashTable.prepareNextPartition` we simply create an iterator over the 
spilled segments which are read by the appropriate HashJoinIterator in a single 
loop.


> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157232#comment-15157232
 ] 

ASF GitHub Bot commented on FLINK-3385:
---

Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1680#issuecomment-187253798
  
Could add description on how is the fix being done and proposed result. 
Thanks


> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-21 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15156041#comment-15156041
 ] 

Greg Hogan commented on FLINK-3385:
---

Pull request is [here|https://github.com/apache/flink/pull/1680]. Had to rename 
the pull request, looks like if there are multiple commits the default name is 
the branch name rather than the commit log.

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-19 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154905#comment-15154905
 ] 

Greg Hogan commented on FLINK-3385:
---

Will submit the pull request when Travis has finished. There was an additional 
fix, discovered with the unit tests running under severe memory constraints, 
where {{processUnmatchedBuildIter}} would cause a {{NullPointerException}} when 
all partitions had spilled to disk. That simple fix is included as a hotfix.

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154530#comment-15154530
 ] 

Fabian Hueske commented on FLINK-3385:
--

Great, thanks a lot!

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-19 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154512#comment-15154512
 ] 

Greg Hogan commented on FLINK-3385:
---

Yes, I literally have my test passing as of 5 minutes ago and was working on 
tidying it up.

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154506#comment-15154506
 ] 

Fabian Hueske commented on FLINK-3385:
--

Hi [~greghogan], have you started to work on this issue?
We are trying to close the blocking issues for the 1.0 release.

Thanks, Fabian

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-10 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141490#comment-15141490
 ] 

Greg Hogan commented on FLINK-3385:
---

Here is the simple function I was using to isolate this case, and constraining 
available memory with -Xmx200m. What is the best way to force spilling in an 
automated test?

{code}
public void testBrokenOuterJoin() throws Exception {
final int NUMBER_OF_ELEMENTS = 2 * 1000 * 1000;

final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataSet ds1 = env
.fromParallelCollection(new NumberSequenceIterator(0, 
NUMBER_OF_ELEMENTS - 1), Long.class);

DataSet ds2 = ds1
.filter(new FilterFunction() {
@Override
public boolean filter(Long value) throws 
Exception {
return false;
}
});

DataSet joinDs = ds1
.leftOuterJoin(ds2, JoinHint.REPARTITION_HASH_FIRST)
.where("*")
.equalTo("*")
.with(new JoinFunction() {
@Override
public Long join(Long first, Long second) 
throws Exception {
return (first == null) ? second : first;
}
});

//  List result = joinDs
//  .collect();
//  Collections.sort(result);

long count = joinDs
.count();

assertEquals(NUMBER_OF_ELEMENTS, count);
}
{code}

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-10 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141681#comment-15141681
 ] 

Greg Hogan commented on FLINK-3385:
---

Yes, I will look into this some more.

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions

2016-02-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141551#comment-15141551
 ] 

Fabian Hueske commented on FLINK-3385:
--

Thanks [~greghogan]! This is obviously a serious issue with the build-side 
outer join.
The {{HashTableITCase}} has a few tests that enforce spilling to disk by 
providing only little memory to the hash table. It should be easy to port your 
code in to that test class.

This bug should be fixed for the upcoming 1.0.0 release. 
Do you want to do it?

> Fix outer join skipping unprobed partitions
> ---
>
> Key: FLINK-3385
> URL: https://issues.apache.org/jira/browse/FLINK-3385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Greg Hogan
>Priority: Blocker
> Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>   public boolean nextRecord() throws IOException {
>   if (buildSideOuterJoin) {
>   return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>   } else {
>   return processProbeIter() || prepareNextPartition();
>   }
>   }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>   if (p.isInMemory()) {
>   ...
>   } else {
>   return false;
>   }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>   if (isInMemory()) {
>   ...
>   }
>   else if (this.probeSideRecordCounter == 0) {
>   // partition is empty, no spilled buffers
>   // return the memory buffer
>   
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>   // delete the spill files
>   this.probeSideChannel.close();
>   this.buildSideChannel.deleteChannel();
>   this.probeSideChannel.deleteChannel();
>   return 0;
>   }
>   else {
>   // flush the last probe side buffer and register this 
> partition as pending
>   this.probeSideBuffer.close();
>   this.probeSideChannel.close();
>   spilledPartitions.add(this);
>   return 1;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)