A simple solution would be to:

1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat
that gets the OutputCommitter as a parameter
2 - change the outputCommitter field of HadoopOutputFormatBase to be a
generic OutputCommitter
3 - remove the default assignment in the open() and finalizeGlobal to the
outputCommitter to FileOutputCommitter(), or keep it as a default in case
of no specific assignment.

saluti,
Stefano

2015-07-22 16:48 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:

> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
> returns false on if
> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>
> i    /**
>      * commit the task by moving the output file out from the temporary
> directory.
>      * @throws java.io.IOException
>      */
>     @Override
>     public void close() throws IOException {
>         this.recordWriter.close(new HadoopDummyReporter());
>
>         if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>             this.fileOutputCommitter.commitTask(this.context);
>         }
>     }
>
>
> Also, both the close and the finalize global use a FileOutputCommitter,
> and never the MongoOutputCommitter
>
> @Override
>     public void finalizeGlobal(int parallelism) throws IOException {
>
>         try {
>             JobContext jobContext =
> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>             FileOutputCommitter fileOutputCommitter = new
> FileOutputCommitter();
>
>             // finalize HDFS output format
>             fileOutputCommitter.commitJob(jobContext);
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>     }
>
> anyone can have a look into that?
>
> saluti,
> Stefano
>
> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>:
>
>> Debugging, it seem the commitTask method of the MongoOutputCommitter is
>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
>> does not fit the task execution method of Flink?
>>
>> any idea? thanks a lot in advance.
>>
>> saluti,
>> Stefano
>>
>> Stefano Bortoli, PhD
>>
>> *ENS Technical Director *_______________________________________________
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Email:* bort...@okkam.it
>>
>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>
>> *Headquarters:* Trento (Italy), Via Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am trying to analyze and update a MongoDB collection with Apache Flink
>>> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>
>>> The process is fairly simple, and the MongoInputFormat works smoothly,
>>> however it does not write back to the collection. The process works,
>>> because the writeAsText works as expected. I am quite puzzled because
>>> debugging I can see it writes in some temporary directory.
>>>
>>> The mapred.output.uri seem to serve just to output a file named
>>> _SUCCESS, and if I do not set it fails with
>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>> string
>>>     at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>>     at org.apache.hadoop.fs.Path.<init>(Path.java:135)
>>>     at
>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>>     at
>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> Anyone experienced something similar? any hints where to look at? Thanks
>>> a lot in advance!
>>>
>>> saluti,
>>> Stefano
>>>
>>> ====================================================
>>> Configuration conf = new Configuration();
>>> conf.set("mapred.output.dir", "/tmp/");
>>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>>                 collectionsUri);
>>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>>                 collectionsUri);
>>>
>>>         Job job = Job.getInstance(conf);
>>>
>>>         // create a MongodbInputFormat, using a Hadoop input format
>>> wrapper
>>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>>> MyMongoInputFormat<Object, BSONObject>();
>>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>>> HadoopInputFormat<Object, BSONObject>(
>>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>>                 job);
>>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>>
>>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>>> collectionsUri);
>>>
>>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>>                 job));
>>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>>>
>>>
>>
>

Reply via email to