dropping unused data from a stream
I will be streaming data and am trying to understand how to get rid of old data from a stream so it does not become to large. I will stream in one large table of buying data and join that to another table of different data. I need the last 14 days from the second table. I will not need data that is older than 14 days. Here is my practice code: streaming1 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\ .csv( "input_stream_csv1") streaming1_with_impressions = streaming1.withWatermark("creation_time", "2 minutes") streaming2 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\ .csv( "input_stream_csv2") streaming1.registerTempTable("my_table1") streaming2.registerTempTable("my_table2") spark.sql("""select t1.* from my_table1 t1 inner join my_table2 t2 on t1.key = t2.key where t1.creation_time < current_timestamp() - interval 15 minutes""")\ .writeStream.trigger(processingTime='10 seconds')\ .format("parquet")\ .option("checkpointLocation", "checkpoint_dir").outputMode("append")\ .option("path", "stream_dir5").start() The important part of the code is the where in the SQL statement, "where t1.creation_time < current_timestamp() - interval 15 minutes" For this example, I am hoping that the stream will not contain any rows more than 15 minutes ago. Is this assumption correct? I am not sure how to test this. In addition I have set a watermark on the first stream of 2 minutes. I am thinking that this watermark will make Spark wait an additional 2 minutes for any data that comes in late. Thanks! -- Henry Tremblay Data Engineer, Best Buy
Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception
I would like to see the full error. However, S3 can give misleading messages if you don't have the correct permissions. On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni wrote: > HI all > i am using the following code for persisting data into S3 (aws keys are > already stored in the environment variables) > > dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName) > > > However, i keep on receiving an exception that the file does not exist > > here's what comes from logs > > 18/04/24 22:15:32 INFO Persiste: Persisting data to text file: > s3://ec2-bucket-mm-spark/form4-results-2404.results > Exception in thread "main" java.io.IOException: > /form4-results-2404.results doesn't exist > > It seems that Spark expects the file to be there before writing? which > seems bizzarre? > > I Have even tried to remove the coalesce ,but still got the same exception > Could anyone help pls? > kind regarsd > marco >
splitting a huge file
We are tasked with loading a big file (possibly 2TB) into a data warehouse. In order to do this efficiently, we need to split the file into smaller files. I don't believe there is a way to do this with Spark, because in order for Spark to distribute the file to the worker nodes, it first has to be split up, right? We ended up using a single machine with a single thread to do the splitting. I just want to make sure I am not missing something obvious. Thanks! -- Paul Henry Tremblay Attunix
small job runs out of memory using wholeTextFiles
As part of my processing, I have the following code: rdd = sc.wholeTextFiles("s3://paulhtremblay/noaa_tmp/", 10) rdd.count() The s3 directory has about 8GB of data and 61,878 files. I am using Spark 2.1, and running it with 15 modes of m3.xlarge nodes on EMR. The job fails with this error: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 35532 in stage 0.0 failed 4 times, most recent failure: Lost task 35532.3 in stage 0.0 (TID 35543, ip-172-31-36-192.us-west-2.compute.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 7.4 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. I have run it dozens of times, increasing partitions, reducing the size of my data set (the original is 60GB), and increasing the number of partitions, but get the same error each time. In contrast, if I run a simple: rdd = sc.textFile("s3://paulhtremblay/noaa_tmp/") rdd.coutn() The job finishes in 15 minutes, even with just 3 nodes. Thanks -- Paul Henry Tremblay Robert Half Technology
Re: bug with PYTHONHASHSEED
I saw the bug fix. I am using the latest Spark available on AWS EMR which I think is 2.01. I am at work and can't check my home config. I don't think AWS merged in this fix. Henry On Tue, Apr 4, 2017 at 4:42 PM, Jeff Zhang wrote: > > It is fixed in https://issues.apache.org/jira/browse/SPARK-13330 > > > > Holden Karau 于2017年4月5日周三 上午12:03写道: > >> Which version of Spark is this (or is it a dev build)? We've recently >> made some improvements with PYTHONHASHSEED propagation. >> >> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern > cal.com> wrote: >> >> 2017-04-01 21:54 GMT+02:00 Paul Tremblay : >> >> When I try to to do a groupByKey() in my spark environment, I get the >> error described here: >> >> http://stackoverflow.com/questions/36798833/what-does- >> exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh >> >> In order to attempt to fix the problem, I set up my ipython environment >> with the additional line: >> >> PYTHONHASHSEED=1 >> >> When I fire up my ipython shell, and do: >> >> In [7]: hash("foo") >> Out[7]: -2457967226571033580 >> >> In [8]: hash("foo") >> Out[8]: -2457967226571033580 >> >> So my hash function is now seeded so it returns consistent values. But >> when I do a groupByKey(), I get the same error: >> >> >> Exception: Randomness of hash of string should be disabled via >> PYTHONHASHSEED >> >> Anyone know how to fix this problem in python 3.4? >> >> >> Independent of the python version, you have to ensure that Python on >> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by >> adding it to the environment of the spark processes. >> >> Best >> >> Eike >> >> -- >> Cell : 425-233-8271 <(425)%20233-8271> >> Twitter: https://twitter.com/holdenkarau >> > -- Paul Henry Tremblay Robert Half Technology
Re: bug with PYTHONHASHSEED
So that means I have to pass that bash variable to the EMR clusters when I spin them up, not afterwards. I'll give that a go. Thanks! Henry On Tue, Apr 4, 2017 at 7:49 AM, Eike von Seggern wrote: > 2017-04-01 21:54 GMT+02:00 Paul Tremblay : > >> When I try to to do a groupByKey() in my spark environment, I get the >> error described here: >> >> http://stackoverflow.com/questions/36798833/what-does-except >> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh >> >> In order to attempt to fix the problem, I set up my ipython environment >> with the additional line: >> >> PYTHONHASHSEED=1 >> >> When I fire up my ipython shell, and do: >> >> In [7]: hash("foo") >> Out[7]: -2457967226571033580 >> >> In [8]: hash("foo") >> Out[8]: -2457967226571033580 >> >> So my hash function is now seeded so it returns consistent values. But >> when I do a groupByKey(), I get the same error: >> >> >> Exception: Randomness of hash of string should be disabled via >> PYTHONHASHSEED >> >> Anyone know how to fix this problem in python 3.4? >> > > Independent of the python version, you have to ensure that Python on > spark-master and -workers is started with PYTHONHASHSEED set, e.g. by > adding it to the environment of the spark processes. > > Best > > Eike > -- Paul Henry Tremblay Robert Half Technology
Re: Alternatives for dataframe collectAsList()
What do you want to do with the results of the query? Henry On Wed, Mar 29, 2017 at 12:00 PM, szep.laszlo.it wrote: > Hi, > > after I created a dataset > > Dataset df = sqlContext.sql("query"); > > I need to have a result values and I call a method: collectAsList() > > List list = df.collectAsList(); > > But it's very slow, if I work with large datasets (20-30 million records). > I > know, that the result isn't presented in driver app, that's why it takes > long time, because collectAsList() collect all data from worker nodes. > > But then what is the right way to get result values? Is there an other > solution to iterate over a result dataset rows, or get values? Can anyone > post a small & working example? > > Thanks & Regards, > Laszlo Szep > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Alternatives-for-dataframe- > collectAsList-tp28547.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Paul Henry Tremblay Robert Half Technology
Re: Read file and represent rows as Vectors
So if I am understanding your problem, you have the data in CSV files, but the CSV files are gunzipped? If so Spark can read a gunzip file directly. Sorry if I didn't understand your question. Henry On Mon, Apr 3, 2017 at 5:05 AM, Old-School wrote: > I have a dataset that contains DocID, WordID and frequency (count) as shown > below. Note that the first three numbers represent 1. the number of > documents, 2. the number of words in the vocabulary and 3. the total number > of words in the collection. > > 189 > 1430 > 12300 > 1 2 1 > 1 39 1 > 1 42 3 > 1 77 1 > 1 95 1 > 1 96 1 > 2 105 1 > 2 108 1 > 3 133 3 > > > What I want to do is to read the data (ignore the first three lines), > combine the words per document and finally represent each document as a > vector that contains the frequency of the wordID. > > Based on the above dataset the representation of documents 1, 2 and 3 will > be (note that vocab_size can be extracted by the second line of the data): > > val data = Array( > Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0), > (95, 1.0), (96, 1.0))), > Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))), > Vectors.sparse(vocab_size, Seq((133, 3.0 > > > The problem is that I am not quite sure how to read the .txt.gz file as RDD > and create an Array of sparse vectors as described above. Please note that > I > actually want to pass the data array in the PCA transformer. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Paul Henry Tremblay Robert Half Technology
Re: Looking at EMR Logs
Thanks. That seems to work great, except EMR doesn't always copy the logs to S3. The behavior seems inconsistent and I am debugging it now. On Fri, Mar 31, 2017 at 7:46 AM, Vadim Semenov wrote: > You can provide your own log directory, where Spark log will be saved, and > that you could replay afterwards. > > Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and > run it. > Note! The path `s3://bucket/some/directory` must exist before you run your > job, it'll not be created automatically. > > The Spark HistoryServer on EMR won't show you anything because it's > looking for logs in `hdfs:///var/log/spark/apps` by default. > > After that you can either copy the log files from s3 to the hdfs path > above, or you can copy them locally to `/tmp/spark-events` (the default > directory for spark logs) and run the history server like: > ``` > cd /usr/local/src/spark-1.6.1-bin-hadoop2.6 > sbin/start-history-server.sh > ``` > and then open http://localhost:18080 > > > > > On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay > wrote: > >> I am looking for tips on evaluating my Spark job after it has run. >> >> I know that right now I can look at the history of jobs through the web >> ui. I also know how to look at the current resources being used by a >> similar web ui. >> >> However, I would like to look at the logs after the job is finished to >> evaluate such things as how many tasks were completed, how many executors >> were used, etc. I currently save my logs to S3. >> >> Thanks! >> >> Henry >> >> -- >> Paul Henry Tremblay >> Robert Half Technology >> > > -- Paul Henry Tremblay Robert Half Technology
bug with PYTHONHASHSEED
When I try to to do a groupByKey() in my spark environment, I get the error described here: http://stackoverflow.com/questions/36798833/what-does- exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh In order to attempt to fix the problem, I set up my ipython environment with the additional line: PYTHONHASHSEED=1 When I fire up my ipython shell, and do: In [7]: hash("foo") Out[7]: -2457967226571033580 In [8]: hash("foo") Out[8]: -2457967226571033580 So my hash function is now seeded so it returns consistent values. But when I do a groupByKey(), I get the same error: Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED Anyone know how to fix this problem in python 3.4? Thanks Henry -- Paul Henry Tremblay Robert Half Technology
pyspark bug with PYTHONHASHSEED
When I try to to do a groupByKey() in my spark environment, I get the error described here: http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh In order to attempt to fix the problem, I set up my ipython environment with the additional line: PYTHONHASHSEED=1 When I fire up my ipython shell, and do: In [7]: hash("foo") Out[7]: -2457967226571033580 In [8]: hash("foo") Out[8]: -2457967226571033580 So my hash function is now seeded so it returns consistent values. But when I do a groupByKey(), I get the same error: Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED Anyone know how to fix this problem in python 3.4? Thanks Henry -- Paul Henry Tremblay Robert Half Technology
Looking at EMR Logs
I am looking for tips on evaluating my Spark job after it has run. I know that right now I can look at the history of jobs through the web ui. I also know how to look at the current resources being used by a similar web ui. However, I would like to look at the logs after the job is finished to evaluate such things as how many tasks were completed, how many executors were used, etc. I currently save my logs to S3. Thanks! Henry -- Paul Henry Tremblay Robert Half Technology
Re: wholeTextFiles fails, but textFile succeeds for same path
I've been working on this problem for several days (I am doing more to increase my knowledge of Spark). The code you linked to hangs because after reading in the file, I have to gunzip it. Another way that seems to be working is reading each file in using sc.textFile, and then writing it the HDFS, and then using wholeTextFiles for the HDFS result. But the bigger issue is that both methods are not executed in parallel. When I open my yarn manager, it shows that only one node is being used. Henry On 02/06/2017 03:39 PM, Jon Gregg wrote: Strange that it's working for some directories but not others. Looks like wholeTextFiles maybe doesn't work with S3? https://issues.apache.org/jira/browse/SPARK-4414 . If it's possible to load the data into EMR and run Spark from there that may be a workaround. This blogspot shows a python workaround that might work as well: http://michaelryanbell.com/processing-whole-files-spark-s3.html Jon On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <mailto:paulhtremb...@gmail.com>> wrote: I've actually been able to trace the problem to the files being read in. If I change to a different directory, then I don't get the error. Is one of the executors running out of memory? On 02/06/2017 02:35 PM, Paul Tremblay wrote: When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc <http://jsc.sc>(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name <http://self.name>) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.com <http://ip-172-31-45-114.us-west-2.com>pute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.com <http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server
Re: Turning rows into columns
Yes, that's what I need. Thanks. P. On 02/05/2017 12:17 PM, Koert Kuipers wrote: since there is no key to group by and assemble records i would suggest to write this in RDD land and then convert to data frame. you can use sc.wholeTextFiles to process text files and create a state machine On Feb 4, 2017 16:25, "Paul Tremblay" <mailto:paulhtremb...@gmail.com>> wrote: I am using pyspark 2.1 and am wondering how to convert a flat file, with one record per row, into a columnar format. Here is an example of the data: u'WARC/1.0', u'WARC-Type: warcinfo', u'WARC-Date: 2016-12-08T13:00:23Z', u'WARC-Record-ID: ', u'Content-Length: 344', u'Content-Type: application/warc-fields', u'WARC-Filename: CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz', u'', u'robots: classic', u'hostname: ip-10-31-129-80.ec2.internal', u'software: Nutch 1.6 (CC)/CC WarcExport 1.0', u'isPartOf: CC-MAIN-2016-50', u'operator: CommonCrawl Admin', u'description: Wide crawl of the web for November 2016', u'publisher: CommonCrawl', u'format: WARC File Format 1.0', u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf <http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>', u'', u'', u'WARC/1.0', u'WARC-Type: request', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 220', u'Content-Type: application/http; msgtype=request', u'WARC-Warcinfo-ID: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>', u'', u'GET /blog/ HTTP/1.0', u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>', u'Accept-Encoding: x-gzip, gzip, deflate', u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/) <http://commoncrawl.org/faq/%29>', u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', u'', u'', u'', u'WARC/1.0', u'WARC-Type: response', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 577', u'Content-Type: application/http; msgtype=response', u'WARC-Warcinfo-ID: ', u'WARC-Concurrent-To: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>', u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM', u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B', u''] I want to convert it to something like: {warc-type='request',warc-date='2016-12-02'. ward-record-id='-- Paul Henry Tremblay Robert Half Technology
Re: wholeTextFiles fails, but textFile succeeds for same path
I've actually been able to trace the problem to the files being read in. If I change to a different directory, then I don't get the error. Is one of the executors running out of memory? On 02/06/2017 02:35 PM, Paul Tremblay wrote: When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) rdd = sc.textFile(in_path) In [8]: rdd.take(1) Out[8]: [u'WARC/1.0'] - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
wholeTextFiles fails, but textFile succeeds for same path
When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) rdd = sc.textFile(in_path) In [8]: rdd.take(1) Out[8]: [u'WARC/1.0'] - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Turning rows into columns
I am using pyspark 2.1 and am wondering how to convert a flat file, with one record per row, into a columnar format. Here is an example of the data: u'WARC/1.0', u'WARC-Type: warcinfo', u'WARC-Date: 2016-12-08T13:00:23Z', u'WARC-Record-ID: ', u'Content-Length: 344', u'Content-Type: application/warc-fields', u'WARC-Filename: CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz', u'', u'robots: classic', u'hostname: ip-10-31-129-80.ec2.internal', u'software: Nutch 1.6 (CC)/CC WarcExport 1.0', u'isPartOf: CC-MAIN-2016-50', u'operator: CommonCrawl Admin', u'description: Wide crawl of the web for November 2016', u'publisher: CommonCrawl', u'format: WARC File Format 1.0', u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf', u'', u'', u'WARC/1.0', u'WARC-Type: request', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 220', u'Content-Type: application/http; msgtype=request', u'WARC-Warcinfo-ID: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/', u'', u'GET /blog/ HTTP/1.0', u'Host: 1018201.vkrugudruzei.ru', u'Accept-Encoding: x-gzip, gzip, deflate', u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)', u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', u'', u'', u'', u'WARC/1.0', u'WARC-Type: response', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 577', u'Content-Type: application/http; msgtype=response', u'WARC-Warcinfo-ID: ', u'WARC-Concurrent-To: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/', u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM', u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B', u''] I want to convert it to something like: {warc-type='request',warc-date='2016-12-02'. ward-record-id='