TaskEnd Metrics
Hi All, I tried to run a simple spark program to find out the metrics collected while executing the program. What I observed is, I'm able to get TaskMetrics.inputMetrics data like records read, bytesread etc. But I do not get any metrics about the output. I ran the below code in local mode as well as on a YARN Cluster, yet the result is the same. This is the code I used to test. val datadf = sqlcontext.read.json("schema.txt").repartition(10) datadf.distinct().write.mode(SaveMode.Append).save("resources\\data-" + Calendar.getInstance.getTimeInMillis) I have attached the listener like this sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val metrics = taskEnd.taskMetrics if (metrics.inputMetrics != None) { println("Metrics for the task Input recs:" + metrics.inputMetrics.get.recordsRead) inputRecords += metrics.inputMetrics.get.recordsRead } if (metrics.outputMetrics != None) { println("Metrics for the task Output recs:" + metrics.inputMetrics.get.recordsRead) outputWritten += metrics.outputMetrics.get.recordsWritten } } }) I get valid data for the input metrics. But none for output metrics. Am I missing anything here. Any pointers, much appreciated.
Fwd: Adding metadata information to parquet files
Just a reminder!! Hi All, I'm trying to ingest data form kafka as parquet files. I use spark 1.5.2 and I'm looking for a way to store the source schema in the parquet file like the way you get to store the avro schema as a metadata info when using the AvroParquetWriter. Any help much appreciated.
Adding metadata information to parquet files
Hi All, I'm trying to ingest data form kafka as parquet files. I use spark 1.5.2 and I'm looking for a way to store the source schema in the parquet file like the way you get to store the avro schema as a metadata info when using the AvroParquetWriter. Any help much appreciated.
Re: Best way to store Avro Objects as Parquet using SPARK
I should have phrased it differently, Avro schema has additional properties like required etc.. Right now the json data that I have gets stored as optional fields in the parquet file. Is there a way to model the parquet file schema, close to avro schema. I tried using the sqc.read.schema(avroScehma).jsonRDD(jsonRDD).toDF() but it has some issues with longType data. I use the below code to convert the avro schema to spark specific schema. def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = { val dummyFIle = File.createTempFile("avroSchema_dummy", "avro") val datumWriter = new GenericDatumWriter[wuser]() datumWriter.setSchema(avroSchema) val writer = new DataFileWriter(datumWriter).create(wuser.getClassSchema, dummyFIle) writer.flush() writer.close() val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath) val sparkSchema = df.schema sparkSchema } So the requirement is, how to validate the incoming data with the avro schema, and handle the bad records as well apart from storing the data in parquet format with schema matching the Avro schema that I have. The approach I have taken is Try converting the json to the avro object and return a tuple having string and boolean (json, valid). and filter out valid records and write the json data directly as parquet files. These parquet files have fields with type message root { optional group FIELD_FOO { optional binary string (UTF8); } . . . } similarly filter out the invalid records as corrupt data. This causes two scans on the rdds 1) filtering valid data 2) filtering invalid data. If there is a better approach please guide. On Mon, Mar 21, 2016 at 11:07 PM, Michael Armbrustwrote: > But when tired using Spark streamng I could not find a way to store the >> data with the avro schema information. The closest that I got was to create >> a Dataframe using the json RDDs and store them as parquet. Here the parquet >> files had a spark specific schema in their footer. >> > > Does this cause a problem? This is just extra information that we use to > store metadata that parquet doesn't directly support, but I would still > expect other systems to be able to read it. >
Re: Best way to store Avro Objects as Parquet using SPARK
Hi, Which version of spark are you using?? On Mon, Mar 21, 2016 at 12:28 PM, Sebastian Piu <sebastian@gmail.com> wrote: > We use this, but not sure how the schema is stored > > Job job = Job.getInstance(); > ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class); > AvroParquetOutputFormat.setSchema(job, schema); > LazyOutputFormat.setOutputFormatClass(job, new > ParquetOutputFormat().getClass()); > job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", > "false"); > job.getConfiguration().set("parquet.enable.summary-metadata", "false"); > > //save the file > rdd.mapToPair(me -> new Tuple2(null, me)) > .saveAsNewAPIHadoopFile( > String.format("%s/%s", path, timeStamp.milliseconds()), > Void.class, > clazz, > LazyOutputFormat.class, > job.getConfiguration()); > > On Mon, 21 Mar 2016, 05:55 Manivannan Selvadurai, < > smk.manivan...@gmail.com> wrote: > >> Hi All, >> >> In my current project there is a requirement to store avro data >> (json format) as parquet files. >> I was able to use AvroParquetWriter in separately to create the Parquet >> Files. The parquet files along with the data also had the 'avro schema' >> stored on them as a part of their footer. >> >>But when tired using Spark streamng I could not find a way to >> store the data with the avro schema information. The closest that I got was >> to create a Dataframe using the json RDDs and store them as parquet. Here >> the parquet files had a spark specific schema in their footer. >> >> Is this the right approach or do I have a better one. Please guide >> me. >> >> >> We are using Spark 1.4.1. >> >> Thanks In Advance!! >> >
Best way to store Avro Objects as Parquet using SPARK
Hi All, In my current project there is a requirement to store avro data (json format) as parquet files. I was able to use AvroParquetWriter in separately to create the Parquet Files. The parquet files along with the data also had the 'avro schema' stored on them as a part of their footer. But when tired using Spark streamng I could not find a way to store the data with the avro schema information. The closest that I got was to create a Dataframe using the json RDDs and store them as parquet. Here the parquet files had a spark specific schema in their footer. Is this the right approach or do I have a better one. Please guide me. We are using Spark 1.4.1. Thanks In Advance!!