[
https://issues.apache.org/jira/browse/TEZ-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
László Bodor updated TEZ-4499:
------------------------------
Description:
Both ShuffleManager and ShuffleScheduler has fields like:
{code}
this.approximateInputRecords =
inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
this.shuffledInputsCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.decompressedDataSizeCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.bytesShuffledDirectDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
...
ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrsCounter =
inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_LENGTH.toString());
badIdErrsCounter =
inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.BAD_ID.toString());
wrongMapErrsCounter =
inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_MAP.toString());
connectionErrsCounter =
inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.CONNECTION.toString());
wrongReduceErrsCounter =
inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
{code}
we should collect then into a separate class like "ShuffleCounters" initialized
by an inputContext and increment them through an interface
having these as fields in ShuffleScheduler class highly contributes to object
creation monsters like this:
{code}
return new FetcherOrderedGrouped(httpConnectionParams,
ShuffleScheduler.this, allocator,
exceptionReporter, jobTokenSecretManager, ifileReadAhead,
ifileReadAheadLength,
codec, conf, localFs, localDiskFetchEnabled, localHostname,
shufflePort, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
connectionErrsCounter, wrongReduceErrsCounter, asyncHttp, sslShuffle,
verifyDiskChecksum, compositeFetch, inputContext);
{code}
was:
Both ShuffleManager and ShuffleScheduler has fields like:
{code}
this.approximateInputRecords =
inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
this.shuffledInputsCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.decompressedDataSizeCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.bytesShuffledDirectDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
{code}
we should collect then into a separate class like "ShuffleCounters" initialized
by an inputContext and increment them through an interface
> Move shuffle counters to a common class
> ---------------------------------------
>
> Key: TEZ-4499
> URL: https://issues.apache.org/jira/browse/TEZ-4499
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: László Bodor
> Priority: Major
>
> Both ShuffleManager and ShuffleScheduler has fields like:
> {code}
> this.approximateInputRecords =
> inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
> this.shuffledInputsCounter =
> inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
> this.failedShufflesCounter =
> inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
> this.bytesShuffledCounter =
> inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
> this.decompressedDataSizeCounter =
> inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
> this.bytesShuffledToDiskCounter =
> inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
> this.bytesShuffledToMemCounter =
> inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
> this.bytesShuffledDirectDiskCounter =
> inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
> ...
> ioErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.IO_ERROR.toString());
> wrongLengthErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.WRONG_LENGTH.toString());
> badIdErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.BAD_ID.toString());
> wrongMapErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.WRONG_MAP.toString());
> connectionErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.CONNECTION.toString());
> wrongReduceErrsCounter =
> inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
> ShuffleErrors.WRONG_REDUCE.toString());
> {code}
> we should collect then into a separate class like "ShuffleCounters"
> initialized by an inputContext and increment them through an interface
> having these as fields in ShuffleScheduler class highly contributes to object
> creation monsters like this:
> {code}
> return new FetcherOrderedGrouped(httpConnectionParams,
> ShuffleScheduler.this, allocator,
> exceptionReporter, jobTokenSecretManager, ifileReadAhead,
> ifileReadAheadLength,
> codec, conf, localFs, localDiskFetchEnabled, localHostname,
> shufflePort, mapHost,
> ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
> wrongMapErrsCounter,
> connectionErrsCounter, wrongReduceErrsCounter, asyncHttp,
> sslShuffle,
> verifyDiskChecksum, compositeFetch, inputContext);
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)