Re: getBytes : save as pdf

2018-10-10 Thread Joel D
I haven’t tried this but maybe you can try using some pdf library to write
the binary contents as pdf.

On Wed, Oct 10, 2018 at 11:30 AM ☼ R Nair 
wrote:

> All,
>
> I am reading a zipped file into an RdD and getting the rdd._1as the name
> and rdd._2.getBytes() as the content. How can I save the latter as a PDF?
> In fact the zipped file is a set of PDFs. I tried saveAsObjectFile and
> saveAsTextFile, but cannot read back the PDF. Any clue please?
>
> Best, Ravion
>


Process Million Binary Files

2018-10-10 Thread Joel D
Hi,

I need to process millions of PDFs in hdfs using spark. First I’m trying
with some 40k files. I’m using binaryFiles api with which I’m facing couple
of issues:

1. It creates only 4 tasks and I can’t seem to increase the parallelism
there.
2. It took 2276 seconds and that means for millions of files it will take
ages to complete. I’m also expecting it to fail for million records with
some timeout or gc overhead exception.

Val files = sparkSession.sparkContext.binaryFiles(filePath, 200).cache

Val fileContentRdd = files.map(file => myFunc(file)



Do you have any guidance on how I can process millions of files using
binaryFiles api?

How can I increase the number of tasks/parallelism during the creation of
files rdd?

Thanks


Re: Text from pdf spark

2018-09-28 Thread Joel D
Yes, I can access the file using cli.

On Fri, Sep 28, 2018 at 1:24 PM kathleen li  wrote:

> The error message is “file not found”
> Are you able to use the following command line to assess the file with the
> user you submitted the job?
> hdfs dfs -ls /tmp/sample.pdf
>
> Sent from my iPhone
>
> On Sep 28, 2018, at 12:10 PM, Joel D  wrote:
>
> I'm trying to extract text from pdf files in hdfs using pdfBox.
>
> However it throws an error:
>
> "Exception in thread "main" org.apache.spark.SparkException: ...
>
> java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf
>
> (No such file or directory)"
>
>
>
>
> What am I missing? Should I be working with PortableDataStream instead of
> the string part of:
>
> val files: RDD[(String, PortableDataStream)]?
>
> def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession:
> SparkSession) = {
>
> val file: File = new File(fileNameFromRDD._1.drop(5))
>
> val document = PDDocument.load(file); //It throws an error here.
>
>
> if (!document.isEncrypted()) {
>
>   val stripper = new PDFTextStripper()
>
>   val text = stripper.getText(document)
>
>   println("Text:" + text)
>
>
> }
>
> document.close()
>
>
>   }
>
>
> //This is where I call the above pdf to text converter method.
>
>  val files =
> sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")
>
> files.foreach(println)
>
>
> files.foreach(f => println(f._1))
>
>
> files.foreach(fileStream => pdfRead(fileStream, sparkSession))
>
>
> Thanks.
>
>
>
>
>
>
>
>


Text from pdf spark

2018-09-28 Thread Joel D
I'm trying to extract text from pdf files in hdfs using pdfBox.

However it throws an error:

"Exception in thread "main" org.apache.spark.SparkException: ...

java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf

(No such file or directory)"




What am I missing? Should I be working with PortableDataStream instead of
the string part of:

val files: RDD[(String, PortableDataStream)]?

def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession:
SparkSession) = {

val file: File = new File(fileNameFromRDD._1.drop(5))

val document = PDDocument.load(file); //It throws an error here.


if (!document.isEncrypted()) {

  val stripper = new PDFTextStripper()

  val text = stripper.getText(document)

  println("Text:" + text)


}

document.close()


  }


//This is where I call the above pdf to text converter method.

 val files =
sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")

files.foreach(println)


files.foreach(f => println(f._1))


files.foreach(fileStream => pdfRead(fileStream, sparkSession))


Thanks.


Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread Joel D
One workaround is to rename the fid  column for each df before joining.

On Thu, Jul 19, 2018 at 9:50 PM  wrote:

> Spark 2.3.0 has this problem upgrade it to 2.3.1
>
> Sent from my iPhone
>
> On Jul 19, 2018, at 2:13 PM, Nirav Patel  wrote:
>
> corrected subject line. It's missing attribute error not ambiguous
> reference error.
>
> On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel 
> wrote:
>
>> I am getting attribute missing error after joining dataframe 'df2' twice .
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> resolved attribute(s) *fid#49 *missing from
>> value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
>> *fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;
>>
>> !Join LeftOuter, (mgrId#15 = fid#49)
>>
>> :- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
>> d31#109]
>>
>> :  +- Join Inner, (df1Id#13 = fid#49)
>>
>> : :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
>> _4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]
>>
>> : :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
>>
>> : +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
>> _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]
>>
>> :+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>>
>> +- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127,
>> _4#43 AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]
>>
>>+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>>
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(
>> CheckAnalysis.scala:40)
>>
>> at
>>
>>
>> As you can see "fid" is present but spark is looking for fid#49 while
>> there is another one fid#127.
>>
>> Physical Plan of original df2 is
>>
>> == Physical Plan ==
>>
>> LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]
>>
>>
>> But by looking at physical plan looks like there are multiple versions of
>> 'fid' gets generated (fid#49, fid#127).
>>
>> Here's the full code.
>>
>>
>> Code:
>>
>> val seq1 = Seq(
>>
>> (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>>
>> (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>>
>> (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>>
>> //val rdd1 = spark.sparkContext.parallelize(seq1)
>>
>> val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")
>>
>> df1.show()
>>
>>
>>
>> val seq2 = Seq(
>>
>> (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>>
>> (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>>
>> (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>>
>>
>>
>>
>>
>> val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")
>>
>> df2.explain()
>>
>> df2.show()
>>
>>
>>
>> val join1 = df1
>>
>>   .join(df2,
>>
>> df1("id") === df2("fid"))
>>
>>   .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2(
>> "id").as("df2id"), df2("fid"), df2("value"))
>>
>> join1.printSchema()
>>
>> join1.show()
>>
>>
>>
>> val join2 = join1
>>
>>   .join(df2,
>>
>>   join1("mgrId") === df2("fid"),
>>
>>   "left")
>>
>>.select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
>> join1("df2id"),
>>
>>join1("fid"), df2("fid").as("df2fid"))
>>
>> join2.printSchema()
>>
>> join2.show()
>>
>>
>>
>>
>>
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 
>
>


Re: [EXTERNAL] - Re: testing frameworks

2018-05-22 Thread Joel D
We’ve developed our own version of testing framework consisting of
different areas of checking, sometimes providing expected data and
comparing with the resultant data from the data object.

Cheers.

On Tue, May 22, 2018 at 1:48 PM Steve Pruitt  wrote:

> Something more on the lines of integration I believe.  Run one or more
> Spark jobs and verify the output results.  If this makes sense.
>
>
>
> I am very new to the world of Spark.  We want to include pipeline testing
> from the get go.  I will check out spark-testing-base.
>
>
>
>
>
> Thanks.
>
>
>
> *From:* Holden Karau [mailto:hol...@pigscanfly.ca]
> *Sent:* Monday, May 21, 2018 11:32 AM
> *To:* Steve Pruitt 
> *Cc:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: testing frameworks
>
>
>
> So I’m biased as the author of spark-testing-base but I think it’s pretty
> ok. Are you looking for unit or integration or something else?
>
>
>
> On Mon, May 21, 2018 at 5:24 AM Steve Pruitt  wrote:
>
> Hi,
>
>
>
> Can anyone recommend testing frameworks suitable for Spark jobs.
> Something that can be integrated into a CI tool would be great.
>
>
>
> Thanks.
>
>
>
> --
>
> Twitter: https://twitter.com/holdenkarau
> 
>


No Tasks have reported metrics yet

2018-01-10 Thread Joel D
Hi,

I've a job which takes a HiveQL joining 2 tables (2.5 TB, 45GB),
repartitions to 100  and then does some other transformations. This
executed fine earlier.

Job stages:
Stage 0: hive table 1 scan
Stage 1: Hive table 2 scan
Stage 2: Tungsten exchange for the join
Stage 3: Tungsten exchange for the reparation

Today the job is stuck in Stage 2. Out of 200 tasks which are supposed to
be executed none of them have started but 290 have failed due to preempted
executors.

Any inputs on how to resolve this issue? I'll try reducing the executor
memory to see if resource allocation is the issue.

Thanks.


Re: Process large JSON file without causing OOM

2017-11-13 Thread Joel D
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping memory
> usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>
>
>
>
>


Schema Evolution Parquet vs Avro

2017-05-29 Thread Joel D
Hi,

We are trying to come up with the best storage format for handling schema
changes in ingested data.

We noticed that both avro and parquet allows one to select based on column
name instead of the data index/position of data. However, we are inclined
towards parquet for better read performance since it's columnar and we will
be selecting few columns instead of all. Data will be processed and saved
to partitions on which we will have hive external tables.

Will parquet be able to handle the following:
- Column renaming from between data
- Column removal from between
- DataType change of existing column (int to bigint should be allowed,
right?)

Please advise.

Thanks,
Sam