[ 
https://issues.apache.org/jira/browse/HBASE-14267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732150#comment-14732150
 ] 

Qianxi Zhang commented on HBASE-14267:
--------------------------------------

Thanks stack, you are right.
When the mapreduce restarts, tableinputformat will scan at last row which has 
been modified in Result.
TableRecordReaderImpl
{code}
ry {
                value = this.scanner.next();
                if (logScannerActivity) {
                    rowcount++;
                    if (rowcount >= logPerRowCount) {
                        long now = System.currentTimeMillis();
                        LOG.info("Mapper took " + (now - timestamp)
                                + "ms to process " + rowcount + " rows");
                        timestamp = now;
                        rowcount = 0;
                    }
                }
            } catch (IOException e) {
                // try to handle all IOExceptions by restarting
                // the scanner, if the second call fails, it will be rethrown
                LOG.info("recovered from " + StringUtils.stringifyException(e));
                if (lastSuccessfulRow == null) {
                    LOG.warn("We are restarting the first next() invocation," +
                            " if your mapper has restarted a few other times 
like this" +
                            " then you should consider killing this job and 
investigate" +
                            " why it's taking so long.");
                }
                if (lastSuccessfulRow == null) {
                    restart(scan.getStartRow());
                } else {
                    restart(lastSuccessfulRow);
                    scanner.next();    // skip presumed already mapped row
                }
{code}

{code}
if (value != null && value.size() > 0) {
                key.set(value.getRow());
                lastSuccessfulRow = key.get();
                lastKey = value.getRow();
                return true;
            }
{code}

lastSuccessfulRow is the key in result.

> In Mapreduce on HBase scenario, restart in TableInputFormat will result in 
> getting wrong data.
> ----------------------------------------------------------------------------------------------
>
>                 Key: HBASE-14267
>                 URL: https://issues.apache.org/jira/browse/HBASE-14267
>             Project: HBase
>          Issue Type: Bug
>          Components: Client, mapreduce
>            Reporter: Qianxi Zhang
>            Assignee: Qianxi Zhang
>         Attachments: HBASE_14267_trunk_v1.patch
>
>
> When I run a mapreduce job on HBase, I will modify the row got from 
> Result.getRow(), for example, reverse the row. Since my program is very 
> complicated to handle data, it takes long time, and the lease int Region 
> server expired. 
> Result#195
> {code}
>   public byte [] getRow() {
>     if (this.row == null) {
>       this.row = (this.cells == null || this.cells.length == 0) ?
>           null :
>           CellUtil.cloneRow(this.cells[0]);
>     }
>     return this.row;
>   }
> {code}
> TableInputFormat will restart the scan from last row, but the row has been 
> modified, so it will read wrong data.
> TableRecordReaderImpl#218
> {code}
>       } catch (IOException e) {
>         // do not retry if the exception tells us not to do so
>         if (e instanceof DoNotRetryIOException) {
>           throw e;
>         }
>         // try to handle all other IOExceptions by restarting
>         // the scanner, if the second call fails, it will be rethrown
>         LOG.info("recovered from " + StringUtils.stringifyException(e));
>         if (lastSuccessfulRow == null) {
>           LOG.warn("We are restarting the first next() invocation," +
>               " if your mapper has restarted a few other times like this" +
>               " then you should consider killing this job and investigate" +
>               " why it's taking so long.");
>         }
>         if (lastSuccessfulRow == null) {
>           restart(scan.getStartRow());
>         } else {
>           restart(lastSuccessfulRow);
>           scanner.next();    // skip presumed already mapped row
>         }
>         value = scanner.next();
>         if (value != null && value.isStale()) numStale++;
>         numRestarts++;
>       }
>       if (value != null && value.size() > 0) {
>         key.set(value.getRow());
>         lastSuccessfulRow = key.get();
>         return true;
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to