Re: Spark Dataframe returning null columns when schema is specified

2017-09-08 Thread Praneeth Gayam
What is the desired behaviour when a field is null for only a few records?
You can not avoid nulls in this case
But if all rows are guaranteed to be uniform(either all-null are
all-non-null), you can *take* the first row of the DF and *drop* the
columns with null fields.

On Fri, Sep 8, 2017 at 12:14 AM, ravi6c2  wrote:

> Hi All, I have this problem where in Spark Dataframe is having null columns
> for the attributes from JSON that are not present. A clear explanation is
> provided below:
>
> *Use case:* Convert the JSON object into dataframe for further usage.
>
> *Case - 1:* Without specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>
>  file/t8407/Screenshot_at_Sep_07_11-36-27.png>
>
> *Case - 2:* With specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().schema(SchemaBuilder.buildSchema()).json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Schema Definition:*
> public static StructType buildSchema() {
> StructType schema = new StructType(
> new StructField[] {
> DataTypes.createStructField("request_id",
> DataTypes.StringType, false),
>
> DataTypes.createStructField("org_id", DataTypes.StringType, false),
>
> DataTypes.createStructField("queue_id", DataTypes.IntegerType, true),
>
> DataTypes.createStructField("owner", DataTypes.StringType, true),
>
> DataTypes.createStructField("disposition", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("created", DataTypes.TimestampType, true),
>
> DataTypes.createStructField("created_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("assigned", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("assigned_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("notes", DataTypes.StringType, true),
>
> DataTypes.createStructField("final_review_status",
> DataTypes.StringType, true),
>
> DataTypes.createStructField("event_tag", DataTypes.StringType, true),
>
> DataTypes.createStructField("additional_data", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("datetime", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("dc", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_id", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_status", DataTypes.StringType, true)
> });
> return (schema);
> }
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>  >
>
> If you see in the above case, when schema is defined I am getting the
> columns that are not specified in the JSON as null. Any work around on
> getting the result as expected in the first image(without nulls) using
> schema? I needed this to perform updates into 

Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Praneeth Gayam
With file stream you will have to deal with the following

   1. The file(s) must not be changed once created. So if the files are
   being continuously appended, the new data will not be read. Refer
   

   2. The files must be created in the dataDirectory by atomically *moving*
or *renaming* them into the data directory.

Since the latency requirements for the second job in the chain is only a
few mins, you may have to end up creating a new file every few mins

You may want to consider Kafka as your intermediary store for building a
chain/DAG of streaming jobs

On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind  wrote:

> Thanks for your response Michael
> Will try it out.
>
> Regards
> Sunita
>
> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>> enrichment on the same dataframe after persisting the raw data, however, in
>>> order to modularize I am planning to have a separate job which picks up the
>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>> 1 job as the enrichments could get project specific while raw data
>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>> have some latency (few minutes)
>>>
>>> My challenge is, after persisting the raw data, how do I chain the next
>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>> partitions on current date (MMDD) and within current date, the job 2
>>> (enrichment job) filters for records within 60s of current time and
>>> performs enrichment on it in 60s batches.
>>> Is this a good option? It seems to be error prone. When either of the
>>> jobs get delayed due to bursts or any error/exception this could lead to
>>> huge data losses and non-deterministic behavior . What are other
>>> alternatives to this?
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita Koppar
>>>
>>
>>


Re: use WithColumn with external function in a java jar

2017-08-28 Thread Praneeth Gayam
You can create a UDF which will invoke your java lib

def calculateExpense: UserDefinedFunction = udf((pexpense: String,
cexpense: String) => new MyJava().calculateExpense(pexpense.toDouble,
cexpense.toDouble))





On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep 
wrote:

> I have data in a DataFrame with below columns
>
> 1)Fileformat is csv
> 2)All below column datatypes are String
>
> employeeid,pexpense,cexpense
>
> Now I need to create a new DataFrame which has new column called
> `expense`, which is calculated based on columns `pexpense`, `cexpense`.
>
> The tricky part is the calculation algorithm is not an **UDF** function
> which I created, but it's an external function that needs to be imported
> from a Java library which takes primitive types as arguments - in this case
> `pexpense`, `cexpense` - to calculate the value required for new column.
>
> The external function signature
>
> public class MyJava
>
> {
>
> public Double calculateExpense(Double pexpense, Double cexpense) {
>// calculation
> }
>
> }
>
> So how can I invoke that external function to create a new calculated
> column. Can I register that external function as UDF in my Spark
> application?
>
> Stackoverflow reference
>
> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-
> function
>
>
>
>
>
>


Re: Error while reading the CSV

2017-04-07 Thread Praneeth Gayam
Try the following

spark-shell --master yarn-client  --name nayan  /opt/packages/-data-
prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar


On Thu, Apr 6, 2017 at 6:36 PM, nayan sharma 
wrote:

> Hi All,
> I am getting error while loading CSV file.
>
> val 
> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("timeline.csv")
> java.lang.NoSuchMethodError: org.apache.commons.csv.
> CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/
> commons/csv/CSVFormat;
>
>
> I have added the dependencies in sbt file
>
> // Spark Additional Library - CSV Read as DFlibraryDependencies += 
> "com.databricks" %% "spark-csv" % “1.5.0"
>
> *and starting the spark-shell with command*
>
> spark-shell --master yarn-client  --jars /opt/packages/-data-
> prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar --name
> nayan
>
>
>
> Thanks for any help!!
>
>
> Thanks,
> Nayan
>