Spark REST API

2017-11-07 Thread Paul Corley
Is there a way to flush the API?

I execute http://localhost:18080/api/v1/applications?status=runningning

In the results I will get a list of applications but not all are still running. 
 This is causing an issue with monitoring what is actually running.

To compound the problem these are currently streaming apps running on EMR.


Paul Corley | Principle Data Engineer
IgnitionOne | Marketing Technology. Simplified.
Office:  1545 Peachtree St NE | Suite 500 | Atlanta, GA | 30309
Direct:  702.336.0094
Email:   paul.cor...@ignitionone.com<mailto:paul.cor...@ignitionone.com>


Structured Streaming from Parquet

2017-05-25 Thread Paul Corley
I have a Spark Structured Streaming process that is implemented in 2 separate 
streaming apps.

First App reads .gz, which range in size from 1GB to 9GB compressed, files in 
from s3 filters out invalid records and repartitions the data and outputs to 
parquet on s3 partitioned the same as the stream is partitioned. This process 
produces thousands of files which other processes consume.  The thought on this 
approach was to:

1)   Break the file down to smaller more easily consumed sizes

2)   Allow a more parallelism in the processes that consume the data.

3)   Allow multiple downstream processes to consume data that has already

a.   Had bad records filtered out

b.   Not have to fully read in such large files

Second application reads in the files produced by the first app.  This process 
then reformats the data from a row that is:

12NDSIN|20170101:123313, 5467;20170115:987

into:
12NDSIN, 20170101, 123313
12NDSIN, 20170101, 5467
12NDSIN, 20170115, 987

App 1 runs no problems and churns through files in its source directory on s3.  
Total process time for a file is < 10min.  App2 is the one having issues.

The source is defined as
val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize)
  .schema(rawSchema)
  .parquet(config.getString("aws.s3.sourcepath"))   <=Line85

output is defined as
val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",config.getString("spark.app.checkpoint_dir") + 
"/bk")
  .option("path", config.getString("spark.app.s3.output"))
  .start()
  .awaitTermination()

If files exist from app 1 app 2 enters a cycle of just cycling through parquet 
at 
ProcessFromSource.scala:85<http://ip-10-205-68-107.ec2.internal:18080/history/application_1491337161441_4439/stages/stage?id=78=0>
   3999/3999

If there are a few files output from app1 eventually it will enter the stage 
where it actually processes the data and begins to output, but the more files 
produced by app1 the longer it takes if it ever completes these steps.  With an 
extremely large number of files the app eventually throws a java OOM error. 
Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in 
this step and how to alleviate it



Thanks,

Paul Corley | Principle Data Engineer