I agree, that looks very much like a common use case.
Right now, there is only support to read from HCatalog tables, but not to
write data to existing tables or create new ones.

Would be a very nice feature to add, IMO.

My guess (without having closely looked at the Hadoop HCatOutputFormat) is
that this can be done in two ways.

1) Loose integration:
- Use a Mapper to convert any data into a HCatRecord
- Emit the HCatRecord to a HCatOutputFormat wrapped into a Flink
HadoopOutputFormat.
This requires custom logic for the Mapper, but is the most versatile
solution and should more or less work out-of-the-box.

2) Tighter integration:
- Make a special Flink HCatOutputFormat that wraps the Hadoop
HCatOutputFormat but which offers special support to convert Flink Tuples
into HCatRecords.
This solution requires to check the input type of the output format against
the HCat table schema and code for the translation to HCatRecords. This is
of course nicer, if you want to emit Tuple data but more work to implement.

Best, Fabian

2015-04-20 16:24 GMT+02:00 Papp, Stefan <stefan.p...@teradata.com>:

> Hi,
>
>
> Lets take Pig as an example...
>
>         collection = LOAD 'test_data.csv' USING PigStorage(';')
>         AS (
>                 col1:chararray,
>                 col2:chararray,
>         );
>
>         # use partitions
>         STORE collection INTO 'import_table_hcat' USING
> org.apache.hcatalog.pig.HCatStorer('datestamp=20150420');
>
> How would I implement this with Flink?
>
> Let us brainstorm about the code snippet...
>
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>         CsvReader csvr = env.readCsvFile(filePath);
>
>         // TODO: Get data into a data set - How to read the whole file?
>         // DataSet<Tuple2<Text, Text>> hadoopResult = csvr.
>
>
>         // TODO: Store data into Hadoop - Write to HDFS / HCatalog
>         // HadoopOutputFormat<Text, IntWritable> hadoopOF =
>                                                   // create the Flink
> wrapper.
>                                                   new
> HadoopOutputFormat<Text, IntWritable>(
>                                                     // set the Hadoop
> OutputFormat and specify the job.
>                                                     new
> TextOutputFormat<Text, IntWritable>(), job
>                                                   );
>
> hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator",
> " ");
>
> TextOutputFormat.setOutputPath(job, new Path(outputPath));
>
>                                                 // Emit data using the
> Hadoop TextOutputFormat.
>
> hadoopResult.output(hadoopOF);
>
>
> My idea is: If I create the tables in HCatalog in advance, I might add
> them by writing to HDFS Hive directory. Any thoughts on this?
>
> Stefan
>
>
>
> -----Original Message-----
> From: Robert Metzger [mailto:rmetz...@apache.org]
> Sent: Monday, April 20, 2015 3:22 PM
> To: dev@flink.apache.org
> Subject: Re: Hadoop ETLing with Flink
>
> Hi Stefan,
>
> you can use Flink to load data into HDFS.
> The CSV reader is suited for reading delimiter separated text files into
> the system. But you can also read data from a lot of other sources (avro,
> jdbc, mongodb, hcatalog).
>
> We don't have any utilities to make writing to HCatalog very easy, but you
> can certainly write to HCatalog with Flink's Hadoop OutputFormat wrappers:
>
> http://ci.apache.org/projects/flink/flink-docs-master/hadoop_compatibility.html#using-hadoop-outputformats
>
> Here is some documentation on how to use the Hcatalog output format:
> https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput
>
> You probably have to do something like:
>
> HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName,
> outputTableName, null)); HCatSchema s =
> HCatOutputFormat.getTableSchema(job);
> HCatOutputFormat.setSchema(job, s);
>
>
>
> Let me know if you need more help writing to Hcatalog.
>
>
>
>
> On Mon, Apr 20, 2015 at 1:29 PM, Papp, Stefan <stefan.p...@teradata.com>
> wrote:
>
> > Hi,
> >
> >
> > I want  load CSV files into a Hadoop cluster. How could I do that with
> > Flink?
> >
> > I know, I can load data into a CsvReader and then iterate over rows
> > and transform them. Is there an easy way to store the results into
> > HDFS+HCatalog within Flink?
> >
> > Thank you!
> >
> > Stefan Papp
> > Lead Hadoop Consultant
> >
> > Teradata GmbH
> > Mobile: +43 664 22 08 616
> > stefan.p...@teradata.com<mailto:stefan.p...@teradata.com>
> > teradata.com<http://www.teradata.com/>
> >
> > This e-mail is from Teradata Corporation and may contain information
> > that is confidential or proprietary. If you are not the intended
> > recipient, do not read, copy or distribute the e-mail or any
> > attachments. Instead, please notify the sender and delete the e-mail and
> any attachments. Thank you.
> > Please consider the environment before printing.
> >
> >
>

Reply via email to