[ 
https://issues.apache.org/jira/browse/CRUNCH-218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681966#comment-13681966
 ] 

Dave Beech commented on CRUNCH-218:
-----------------------------------

Josh - thanks for putting a patch together so quickly! I'll test it out myself 
this morning but just to confirm what Gabriel said, the idea is definitely to 
avoid running MyExpensiveOperation if the data at 'processed' exists, and in 
general, to only run MyExpensiveOperation as many times as strictly necessary. 

At the moment, in order to achieve this in a single pipeline run I might have 
to put a 'run' call after the first write:

PCollection<String> words = ...;
    PCollection<String> processedWords = words.parallelDo(new 
MyExpensiveOperation());
    pipeline.write(processedWords, "processed", WriteMode.SKIP_IF_EXISTS);
    pipeline.run(); // RUN PIPELINE UP TO WRITE OF "processed" 
    PCollection<Pair<String,Long>> counted = processedWords.count();
    pipeline.write(counted, "counted");

I think this would only run MyExpensiveOperation once. But then if I run the 
same pipeline again, because "processed" already exists I don't want to run 
MyExpensiveOperation _at all_


                
> Add new Target.WriteMode to skip the write and continue pipeline if an output 
> target exists
> -------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-218
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-218
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 0.6.0
>            Reporter: Dave Beech
>            Assignee: Josh Wills
>            Priority: Minor
>         Attachments: CRUNCH-218b.patch, CRUNCH-218.patch
>
>
> Quite often I write pipelines which persist data to the filesystem midway 
> through the process, and then carry on doing further work. 
> If this intermediate data is already present, I think it would be good if I 
> could set a write mode which skips over this first half of processing. This 
> way I'd avoid running jobs unnecessarily and wasting cluster resources 
> regenerating data I already have. 
> Example:
> PCollection<B> inter = 
> pipeline.read(source).parallelDo(something).parallelDo(somethingElse);
> inter.write(At.sequenceFile('output'), WriteMode.SKIP_IF_EXISTS);
> PCollection<C> final = inter.parallelDo(moreWork);
>  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to