[ 
https://issues.apache.org/jira/browse/CRUNCH-636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari updated CRUNCH-636:
----------------------------------
    Attachment: test.WordCount_2017-03-08_16.31.55.737.log
                test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png

I believe I have found a way to handle the issue. Previously, I set the 
replication in a wrong place in {{MSCROutputHandler}}. Simply I cannot rely on 
target path, we need to make sure that a job has only temporary output files 
when we set the supplied replication factor. 

If a job has both temporary and final files, then we should use the default / 
initial dfs.replication in the MR job. We can make a decision in the 
{{build()}} method of {{JobPrototype}}. Before  
https://github.com/apache/crunch/blob/7f85ee5816a19eca0e87ce503ea0b03ea294433c/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java#L155
 , we create a boolean that maintains if the job at hand has only temporary 
files. If there is any non-temporary output we set this to false inside the 
loop. Based on this we can set the proper replication after the loop.

Regarding user configuration, I guess we should allow whatever they set. If 
MapReduce allows it, we allow it. 

I have attached an example jobplan and a debug log to show what is happening 
right now.

Following the execution: 
- At the beginning we see Crunch creates all the temporary directories.
{code}
7/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: 
/tmp/crunch-1708944155/p1
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: 
/tmp/crunch-1708944155/p2
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: 
/tmp/crunch-1708944155/p3
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: 
/tmp/crunch-1708944155/p4
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: 
/tmp/crunch-1708944155/p5
{code}

- We remember these paths to make sure we can decide whether an output file is 
temporary or not.

- Then we check current replication factor set for the job and adjust it if we 
are dealing with a temporary job output:
{code}
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target 
:Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: 
Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target 
:SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: 
SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting replication factor to: 9 
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: 
Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.JobPrototype: --- target 
:SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: 
SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
{code}

* Crunch Job 3 creates only a temporary file {{ /tmp/crunch-1543087915/p2}} so 
we set replication factor to 9 (user specified value of 
{{crunch.tmp.dir.replication}}). 
{code}
$ hdfs dfs -ls  /tmp/crunch-1543087915/p2        
17/03/08 17:44:13 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 16:32 
/tmp/crunch-1543087915/p2/_SUCCESS
-rw-r--r--   9 asasvari supergroup       1183 2017-03-08 16:32 
/tmp/crunch-1543087915/p2/part-r-00000
{code}
* Crunch Job 2 produces a temporary file and a non-temporary file, so we must 
replication factor to the original {{dfs.replication}}.
* Crunch Job 1 has 1 non-temporary output file, so {{dfs.replication}} is used.
{code}
hdfs dfs -ls  /user/asasvari/iwTV_union_count 
17/03/08 17:45:54 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 17:06 
/user/asasvari/iwTV_union_count/_SUCCESS
-rw-r--r--   3 asasvari supergroup        921 2017-03-08 17:06 
/user/asasvari/iwTV_union_count/part-r-00000
{code}

I had some other questions regarding failure of a big pipeline. 
- Let's assume replication factor is set to 1 for temporary files, and we have 
a huge pipeline with hundreds of nodes. Node fails that stores temporary data 
very near to the end of pipeline execution, so the last job cannot finish. So 
the pipeline failed. There are no rollback actions are performed that delete 
outputs by earlier jobs. With checkpointing enabled, previous successful jobs 
are not necessary to perform.

- Does it make sense to allow setting replication higher than initial for 
temporary files? Probably not, but we can allow it. 



> Make replication factor for temporary files configurable
> --------------------------------------------------------
>
>                 Key: CRUNCH-636
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-636
>             Project: Crunch
>          Issue Type: New Feature
>            Reporter: Attila Sasvari
>            Assignee: Attila Sasvari
>         Attachments: test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png, 
> test.WordCount_2017-03-08_16.31.55.737.log
>
>
> As of now, Crunch does not allow having different replication factor for 
> temporary files and non-temporary files (e.g. final output data of leaf 
> nodes) at the same time. If a user has a large amount of data (say hundreds a 
> of gigabytes) to process, they might want to have lower replication factor 
> for large temporary files between Crunch jobs. 
> We could make this configurable via a new setting (e.g. 
> {{crunch.tmp.dir.replication}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to