PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dd81989f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dd81989f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dd81989f Branch: refs/heads/4.x-cdh5.15 Commit: dd81989fab80cb283678218ada0c0359930731c8 Parents: 590f88b Author: Geoffrey Jacoby <gjac...@apache.org> Authored: Fri Nov 16 21:57:45 2018 +0000 Committer: Pedro Boado <pbo...@apache.org> Committed: Tue Nov 27 15:12:05 2018 +0000 ---------------------------------------------------------------------- .../mapreduce/index/PhoenixIndexImportDirectMapper.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd81989f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index eb4bc0e..e2ac491 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends private long batchSizeBytes; private MutationState mutationState; + private int currentBatchCount = 0; + @Override protected void setup(final Context context) throws IOException, InterruptedException { @@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends throws IOException, InterruptedException { try { + currentBatchCount++; final List<Object> values = record.getValues(); indxWritable.setValues(values); indxWritable.write(this.pStatement); @@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends } // Keep accumulating Mutations till batch size mutationState.join(currentMutationState); - // Write Mutation Batch - if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) { + if (currentBatchCount % batchSize == 0) { writeBatch(mutationState, context); mutationState = null; } @@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends context.progress(); } catch (SQLException e) { LOG.error(" Error {} while read/write of a record ", e.getMessage()); - context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount); throw new RuntimeException(e); } context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); @@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends mutationPair.getSecond().size()); } connection.rollback(); + currentBatchCount = 0; } @Override @@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends super.cleanup(context); } catch (SQLException e) { LOG.error(" Error {} while read/write of a record ", e.getMessage()); - context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount); throw new RuntimeException(e); } finally { if (connection != null) {