RE: How to avoid long-running jobs blocking short-running jobs

2018-11-05 Thread Taylor Cox
Hi Conner,

What is preventing you from using a cluster model?
I wonder if docker containers could help you here?
A quick internet search yielded Mist: https://github.com/Hydrospheredata/mist 
Could be useful?

Taylor

-Original Message-
From: conner  
Sent: Saturday, November 3, 2018 2:04 AM
To: user@spark.apache.org
Subject: How to avoid long-running jobs blocking short-running jobs

Hi,

I use spark cluster to run ETL jobs and analysis computation about the data 
after elt stage.
The elt jobs can keep running for several hours, but analysis computation is a 
short-running job which can finish in a few seconds.
The dilemma I entrapped is that my application runs in a single JVM and can't 
be a cluster application, so just one spark context in my application 
currently. But when the elt jobs are running, the jobs will occupy all resource 
including worker executors too long to block all my analysis computation jobs. 

My solution is to find a good way to divide the spark cluster resource into 
two. One part for analysis computation jobs, another for elt jobs. if the part 
for elt jobs is free, I can allocate analysis computation jobs to it.
So I want to find a middleware that can support two spark context and it must 
be embedded in my application. I do some research on the third party project 
spark job server. It can divide spark resource by launching another JVM to run 
spark context with a specific resource.
these operations are invisible to the upper layer, so it's a good solution for 
me. But this project is running in a single JVM  and just support REST API, I 
can't endure the data transfer by TCP again which too slow to me. I want to get 
a result from spark cluster by TCP and give this result to view layer to show.
Can anybody give me some good suggestion? I shall be so grateful.





--
Sent from: 
https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2Fdata=02%7C01%7CTaylor.Cox%40microsoft.com%7C3f9379c723d64ca988a908d6416b4f7c%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636768326485388503sdata=h%2BOzv9rIxo%2FYI6xmjFYvEyvcptmDXEBBA%2BDVhngpKsk%3Dreserved=0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Shuffle write explosion

2018-11-05 Thread Taylor Cox
At first glance, I wonder if your tables are partitioned? There may not be 
enough parallelism happening. You can also pass in the number of partitions 
and/or a custom partitioner to help Spark “guess” how to organize the shuffle.

Have you seen any of these docs?
https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
https://spark.apache.org/docs/latest/tuning.html

Taylor


From: Yichen Zhou 
Sent: Sunday, November 4, 2018 11:42 PM
To: user@spark.apache.org
Subject: Shuffle write explosion

Hi All,

When running a spark job, I have 100MB+ input and get more than 700GB shuffle 
write, which is really weird. And this job finally end up with the OOM error. 
Does anybody know why this happened?
[Screen Shot 2018-11-05 at 15.20.35.png]
My code is like:
JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class, 
Text.class);
 
inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration());

Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5
java version "1.8.0_162"
Spark 2.1.2

Any help is greatly appreciated.

Regards,
Yichen


RE: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Taylor Cox
Hey Nirav,

Here’s an idea:

Suppose your file.csv has N records, one for each line. Read the csv 
line-by-line (without spark) and attempt to parse each line. If a record is 
malformed, catch the exception and rethrow it with the line number. That should 
show you where the problematic record(s) can be found.

From: Nirav Patel 
Sent: Monday, October 8, 2018 11:57 AM
To: spark users 
Subject: CSV parser - is there a way to find malformed csv record

I am getting `RuntimeException: Malformed CSV record` while parsing csv record 
and attaching schema at same time. Most likely there are additional commas or 
json data in some field which are not escaped properly. Is there a way CSV 
parser tells me which record is malformed?


This is what I am using:

val df2 = sparkSession.read
  .option("inferSchema", true)
  .option("multiLine", true)
  .schema(headerDF.schema) // this only works without column mismatch
  .csv(dataPath)

Thanks



[Image removed by sender. What's New with 
Xactly]

[Image removed by 
sender.]
  [Image removed by sender.] 

   [Image removed by sender.] 

   [Image removed by sender.] 

   [Image removed by sender.] 



RE: How to do sliding window operation on RDDs in Pyspark?

2018-10-02 Thread Taylor Cox
Have a look at this guide here:
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

You should be able to send your sensor data to a Kafka topic, which Spark will 
subscribe to. You may need to use an Input DStream to connect Kafka to Spark.

https://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/#read-parallelism-in-spark-streaming

Taylor

-Original Message-
From: zakhavan  
Sent: Tuesday, October 2, 2018 1:16 PM
To: user@spark.apache.org
Subject: RE: How to do sliding window operation on RDDs in Pyspark?

Thank you, Taylor for your reply. The second solution doesn't work for my case 
since my text files are getting updated every second. Actually, my input data 
is live such that I'm getting 2 streams of data from 2 seismic sensors and then 
I write them into 2 text files for simplicity and this is being done in 
real-time and text files get updated. But it seems I need to change my data 
collection method and store it as 2 DStreams. I know Kafka will work but I 
don't know how to do that because I will need to implement a custom Kafka 
consumer to consume the incoming data from the sensors and produce them as 
DStreams.

The following code is how I'm getting the data and write them into 2 text files.

Do you have any idea how I can use Kafka in this case so that I have DStreams 
instead of RDDs?

from obspy.clients.seedlink.easyseedlink import create_client from obspy import 
read import numpy as np import obspy from obspy import UTCDateTime


def handle_data(trace):
print('Received new data:')
print(trace)
print()


if trace.stats.network == "IU":
trace.write("/home/zeinab/data1.mseed")
st1 = obspy.read("/home/zeinab/data1.mseed")
for i, el1 in enumerate(st1):
f = open("%s_%d" % ("out_file1.txt", i), "a")
f1 = open("%s_%d" % ("timestamp_file1.txt", i), "a")
np.savetxt(f, el1.data, fmt="%f")
np.savetxt(f1, el1.times("utcdatetime"), fmt="%s")
f.close()
f1.close()
if trace.stats.network == "CU":
trace.write("/home/zeinab/data2.mseed")
st2 = obspy.read("/home/zeinab/data2.mseed")
for j, el2 in enumerate(st2):
ff = open("%s_%d" % ("out_file2.txt", j), "a")
ff1 = open("%s_%d" % ("timestamp_file2.txt", j), "a")
np.savetxt(ff, el2.data, fmt="%f")
np.savetxt(ff1, el2.times("utcdatetime"), fmt="%s")
ff.close()
ff1.close()







client = create_client('rtserve.iris.washington.edu:18000', handle_data) 
client.select_stream('IU', 'ANMO', 'BHZ') client.select_stream('CU', 'ANWB', 
'BHZ')
client.run()

Thank you,

Zeinab



--
Sent from: 
https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2Fdata=02%7C01%7CTaylor.Cox%40microsoft.com%7C4fc4bb46120a45b8074808d628a3daea%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636741081549350604sdata=Ucj9pU3mow1woS%2Bp%2B5F9eyYkKPzTyvGFuPnYWhEgsBk%3Dreserved=0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: How to do sliding window operation on RDDs in Pyspark?

2018-10-02 Thread Taylor Cox
Hey Zeinab,

We may have to take a small step back here. The sliding window approach (ie: 
the window operation) is unique to Data stream mining. So it makes sense that 
window() is restricted to DStream. 

It looks like you're not using a stream mining approach. From what I can see in 
your code, the files are being read in, and you are using the window() 
operation after you have all the information.

Here's what can solve your problem:
1) Read the inputs into two DStreams and use window() as needed, or
2) You can always take a range of inputs from a spark RDD. Perhaps this will 
set you in the right direction:
https://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd

Let me know if this helps your issue,

Taylor

-Original Message-
From: zakhavan  
Sent: Tuesday, October 2, 2018 9:30 AM
To: user@spark.apache.org
Subject: How to do sliding window operation on RDDs in Pyspark?

Hello,

I have 2 text file in the following form and my goal is to calculate the 
Pearson correlation between them using sliding window in pyspark:

123.00
-12.00
334.00
.
.
.

First I read these 2 text file and store them in RDD format and then I apply 
the window operation on each RDD but I keep getting this error:
*
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
#   DEFINE your input path
input_path1 = sys.argv[1]
input_path2 = sys.argv[2]



num_of_partitions = 4
rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

#Windowing
windowedrdd1= rdd1.window(3,2)
windowedrdd2= rdd2.window(3,2)

#Correlation between sliding windows

CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,
method="pearson")


if CrossCorr >= 0.7:
print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I 
have RDD here how I can do window operation on RDDs?

Thank you,

Zeinab





--
Sent from: 
https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2Fdata=02%7C01%7CTaylor.Cox%40microsoft.com%7C67fd11306aa44701149c08d628845f7b%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636740946337699799sdata=SrN2Aa80JjxZkX4diCllXgkGRADWxleXaJovd8YcfGY%3Dreserved=0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org