For clarification: I wrote "Right now, there is only support to read from HCatalog tables, but not to write data to existing tables or create new ones.".
I think this is not correct. You should be able to write to HCatalog using the regular HadoopOutputFormat wrapper as described in the "loose integration" option. Although, I think this has not been tried before. So it would be nice to know whether it actually works or not. 2015-04-20 16:44 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > I agree, that looks very much like a common use case. > Right now, there is only support to read from HCatalog tables, but not to > write data to existing tables or create new ones. > > Would be a very nice feature to add, IMO. > > My guess (without having closely looked at the Hadoop HCatOutputFormat) is > that this can be done in two ways. > > 1) Loose integration: > - Use a Mapper to convert any data into a HCatRecord > - Emit the HCatRecord to a HCatOutputFormat wrapped into a Flink > HadoopOutputFormat. > This requires custom logic for the Mapper, but is the most versatile > solution and should more or less work out-of-the-box. > > 2) Tighter integration: > - Make a special Flink HCatOutputFormat that wraps the Hadoop > HCatOutputFormat but which offers special support to convert Flink Tuples > into HCatRecords. > This solution requires to check the input type of the output format > against the HCat table schema and code for the translation to HCatRecords. > This is of course nicer, if you want to emit Tuple data but more work to > implement. > > Best, Fabian > > 2015-04-20 16:24 GMT+02:00 Papp, Stefan <stefan.p...@teradata.com>: > >> Hi, >> >> >> Lets take Pig as an example... >> >> collection = LOAD 'test_data.csv' USING PigStorage(';') >> AS ( >> col1:chararray, >> col2:chararray, >> ); >> >> # use partitions >> STORE collection INTO 'import_table_hcat' USING >> org.apache.hcatalog.pig.HCatStorer('datestamp=20150420'); >> >> How would I implement this with Flink? >> >> Let us brainstorm about the code snippet... >> >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> CsvReader csvr = env.readCsvFile(filePath); >> >> // TODO: Get data into a data set - How to read the whole file? >> // DataSet<Tuple2<Text, Text>> hadoopResult = csvr. >> >> >> // TODO: Store data into Hadoop - Write to HDFS / HCatalog >> // HadoopOutputFormat<Text, IntWritable> hadoopOF = >> // create the Flink >> wrapper. >> new >> HadoopOutputFormat<Text, IntWritable>( >> // set the Hadoop >> OutputFormat and specify the job. >> new >> TextOutputFormat<Text, IntWritable>(), job >> ); >> >> hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", >> " "); >> >> TextOutputFormat.setOutputPath(job, new Path(outputPath)); >> >> // Emit data using the >> Hadoop TextOutputFormat. >> >> hadoopResult.output(hadoopOF); >> >> >> My idea is: If I create the tables in HCatalog in advance, I might add >> them by writing to HDFS Hive directory. Any thoughts on this? >> >> Stefan >> >> >> >> -----Original Message----- >> From: Robert Metzger [mailto:rmetz...@apache.org] >> Sent: Monday, April 20, 2015 3:22 PM >> To: dev@flink.apache.org >> Subject: Re: Hadoop ETLing with Flink >> >> Hi Stefan, >> >> you can use Flink to load data into HDFS. >> The CSV reader is suited for reading delimiter separated text files into >> the system. But you can also read data from a lot of other sources (avro, >> jdbc, mongodb, hcatalog). >> >> We don't have any utilities to make writing to HCatalog very easy, but >> you can certainly write to HCatalog with Flink's Hadoop OutputFormat >> wrappers: >> >> http://ci.apache.org/projects/flink/flink-docs-master/hadoop_compatibility.html#using-hadoop-outputformats >> >> Here is some documentation on how to use the Hcatalog output format: >> https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput >> >> You probably have to do something like: >> >> HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, >> outputTableName, null)); HCatSchema s = >> HCatOutputFormat.getTableSchema(job); >> HCatOutputFormat.setSchema(job, s); >> >> >> >> Let me know if you need more help writing to Hcatalog. >> >> >> >> >> On Mon, Apr 20, 2015 at 1:29 PM, Papp, Stefan <stefan.p...@teradata.com> >> wrote: >> >> > Hi, >> > >> > >> > I want load CSV files into a Hadoop cluster. How could I do that with >> > Flink? >> > >> > I know, I can load data into a CsvReader and then iterate over rows >> > and transform them. Is there an easy way to store the results into >> > HDFS+HCatalog within Flink? >> > >> > Thank you! >> > >> > Stefan Papp >> > Lead Hadoop Consultant >> > >> > Teradata GmbH >> > Mobile: +43 664 22 08 616 >> > stefan.p...@teradata.com<mailto:stefan.p...@teradata.com> >> > teradata.com<http://www.teradata.com/> >> > >> > This e-mail is from Teradata Corporation and may contain information >> > that is confidential or proprietary. If you are not the intended >> > recipient, do not read, copy or distribute the e-mail or any >> > attachments. Instead, please notify the sender and delete the e-mail >> and any attachments. Thank you. >> > Please consider the environment before printing. >> > >> > >> > >