Thanks, Timo. I have not used and explore Table API until now. I have used dataset and datastream API only. I will read about the Table API.
On Wed, Feb 19, 2020 at 4:33 PM Timo Walther <twal...@apache.org> wrote: > Hi Anuj, > > another option would be to use the new Hive connectors. Have you looked > into those? They might work on SQL internal data types which is why you > would need to use the Table API then. > > Maybe Bowen in CC can help you here. > > Regards, > Timo > > On 19.02.20 11:14, Rafi Aroch wrote: > > Hi Anuj, > > > > It's been a while since I wrote this (Flink 1.5.2). Could be a > > better/newer way, but this is what how I read & write Parquet with > > hadoop-compatibility: > > > > // imports > > import org.apache.avro.generic.GenericRecord; > > import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; > > > > import > org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; > > > > import org.apache.flink.hadoopcompatibility.HadoopInputs; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.fs.Path; > > import org.apache.hadoop.mapreduce.Job; > > import org.apache.parquet.avro.AvroParquetInputFormat; > > > > // Creating Parquet input format > > Configuration conf = new Configuration(); > > Job job = Job.getInstance(conf); > > AvroParquetInputFormat<GenericRecord> parquetInputFormat = new > > AvroParquetInputFormat<>(); > > AvroParquetInputFormat.setInputDirRecursive(job, true); > > AvroParquetInputFormat.setInputPaths(job, pathsToProcess); > > HadoopInputFormat<Void, GenericRecord> inputFormat > > = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class, > > GenericRecord.class, job); > > > > // Creating Parquet output format > > AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new > > AvroParquetOutputFormat<>(); > > AvroParquetOutputFormat.setSchema(job, new > > Schema.Parser().parse(SomeEvent.SCHEMA)); > > AvroParquetOutputFormat.setCompression(job, > > CompressionCodecName.SNAPPY); > > AvroParquetOutputFormat.setCompressOutput(job, true); > > AvroParquetOutputFormat.setOutputPath(job, new Path(pathString)); > > HadoopOutputFormat<Void, GenericRecord> outputFormat = new > > HadoopOutputFormat<>(parquetOutputFormat, job); > > > > DataSource<Tuple2<Void, GenericRecord>> inputFileSource = > > env.createInput(inputFormat); > > > > // Start processing... > > > > // Writing result as Parquet > > resultDataSet.output(outputFormat); > > > > > > Regarding writing partitioned data, as far as I know, there is no way to > > achieve that with the DataSet API with hadoop-compatibility. > > > > You could implement this with reading from input files as stream and > > then using StreamingFileSink with a custom BucketAssigner [1]. > > The problem with that (which was not yet resolved AFAIK) is described > > here [2] in "Important Notice 2". > > > > Sadly I say, that eventually, for this use-case I chose Spark to do the > > job... > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general > > > > Hope this helps. > > > > Rafi > > > > > > On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com > > <mailto:ajainje...@gmail.com>> wrote: > > > > Hi Rafi, > > > > I have a similar use case where I want to read parquet files in the > > dataset and want to perform some transformation and similarly want > > to write the result using year month day partitioned. > > > > I am stuck at first step only where how to read and write > > Parquet files using hadoop-Compatability. > > > > Please help me with this and also if u find the solution for how to > > write data in partitioned. > > > > Thanks, > > Anuj > > > > On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin > > <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote: > > > > Hi Rafi, > > > > At the moment I do not see any support of Parquet in DataSet API > > except HadoopOutputFormat, mentioned in stack overflow question. > > I have cc’ed Fabian and Aljoscha, maybe they could provide more > > information. > > > > Best, > > Andrey > > > >> On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com > >> <mailto:rafi.ar...@gmail.com>> wrote: > >> > >> Hi, > >> > >> I'm writing a Batch job which reads Parquet, does some > >> aggregations and writes back as Parquet files. > >> I would like the output to be partitioned by year, month, day > >> by event time. Similarly to the functionality of the > >> BucketingSink. > >> > >> I was able to achieve the reading/writing to/from Parquet by > >> using the hadoop-compatibility features. > >> I couldn't find a way to partition the data by year, month, > >> day to create a folder hierarchy accordingly. Everything is > >> written to a single directory. > >> > >> I could find an unanswered question about this issue: > >> > https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit > >> > >> Can anyone suggest a way to achieve this? Maybe there's a way > >> to integrate the BucketingSink with the DataSet API? Another > >> solution? > >> > >> Rafi > > > > > > > > -- > > Thanks & Regards, > > Anuj Jain > > Mob. : +91- 8588817877 > > Skype : anuj.jain07 > > ****<http://www.oracle.com/> > > > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > > > > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>