dropping unused data from a stream

2019-01-22 Thread Paul Tremblay
I will be streaming data and am trying to understand how to get rid of old
data from a stream so it does not become to large. I will stream in one
large table of buying data and join that to another table of different
data. I need the last 14 days from the second table. I will not need data
that is older than 14 days.

Here is my practice code:


streaming1 =
spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv1")
streaming1_with_impressions = streaming1.withWatermark("creation_time", "2
minutes")
streaming2 =
spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv2")
streaming1.registerTempTable("my_table1")
streaming2.registerTempTable("my_table2")
spark.sql("""select t1.* from my_table1 t1
inner join my_table2 t2 on t1.key = t2.key
where t1.creation_time < current_timestamp() - interval 15 minutes""")\
.writeStream.trigger(processingTime='10 seconds')\
.format("parquet")\
.option("checkpointLocation", "checkpoint_dir").outputMode("append")\
.option("path", "stream_dir5").start()

The important part of the code is the where in the SQL statement, "where
t1.creation_time < current_timestamp() - interval 15 minutes"

For this example, I am hoping that the stream will not contain any rows
more than 15 minutes ago. Is this assumption correct? I am not sure how to
test this. In addition I have set a watermark on the first stream of 2
minutes. I am thinking that this watermark will make Spark wait an
additional 2 minutes for any data that comes in late.

Thanks!
-- 
Henry Tremblay
Data Engineer, Best Buy


Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-05-02 Thread Paul Tremblay
I would like to see the full error. However, S3 can give misleading
messages if you don't have the correct permissions.

On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni  wrote:

> HI all
>  i am using the following code for persisting data into S3 (aws keys are
> already stored in the environment variables)
>
> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)
>
>
> However, i keep on receiving an exception that the file does not exist
>
> here's what comes from logs
>
> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
> s3://ec2-bucket-mm-spark/form4-results-2404.results
> Exception in thread "main" java.io.IOException:
> /form4-results-2404.results doesn't exist
>
> It seems that Spark expects the file to be there before writing? which
> seems bizzarre?
>
> I Have even tried to remove the coalesce ,but still got the same exception
> Could anyone help pls?
> kind regarsd
>  marco
>


splitting a huge file

2017-04-21 Thread Paul Tremblay
We are tasked with loading a big file (possibly 2TB) into a data warehouse.
In order to do this efficiently, we need to split the file into smaller
files.

I don't believe there is a way to do this with Spark, because in order for
Spark to distribute the file to the worker nodes, it first has to be split
up, right?

We ended up using a single machine with a single thread to do the
splitting. I just want to make sure I am not missing something obvious.

Thanks!

-- 
Paul Henry Tremblay
Attunix


small job runs out of memory using wholeTextFiles

2017-04-07 Thread Paul Tremblay
As part of my processing, I have the following code:

rdd = sc.wholeTextFiles("s3://paulhtremblay/noaa_tmp/", 10)
rdd.count()

The s3 directory has about 8GB of data and 61,878 files. I am using Spark
2.1, and running it with 15 modes of m3.xlarge nodes on EMR.

The job fails with this error:

: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 35532 in stage 0.0 failed 4 times, most recent failure: Lost task
35532.3 in stage 0.0 (TID 35543,
ip-172-31-36-192.us-west-2.compute.internal, executor 6):
ExecutorLostFailure (executor 6 exited caused by one of the running
tasks) Reason: Container killed by YARN for exceeding memory limits.
7.4 GB of 5.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.


I have run it dozens of times, increasing partitions, reducing the size of
my data set (the original is 60GB), and increasing the number of
partitions, but get the same error each time.

In contrast, if I run a simple:

rdd = sc.textFile("s3://paulhtremblay/noaa_tmp/")
rdd.coutn()

The job finishes in 15 minutes, even with just 3 nodes.

Thanks

-- 
Paul Henry Tremblay
Robert Half Technology


Re: bug with PYTHONHASHSEED

2017-04-05 Thread Paul Tremblay
I saw the bug fix. I am using the latest Spark available on AWS EMR which I
think is 2.01. I am at work and can't check my home config. I don't think
AWS merged in this fix.

Henry

On Tue, Apr 4, 2017 at 4:42 PM, Jeff Zhang  wrote:

>
> It is fixed in https://issues.apache.org/jira/browse/SPARK-13330
>
>
>
> Holden Karau 于2017年4月5日周三 上午12:03写道:
>
>> Which version of Spark is this (or is it a dev build)? We've recently
>> made some improvements with PYTHONHASHSEED propagation.
>>
>> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern > cal.com> wrote:
>>
>> 2017-04-01 21:54 GMT+02:00 Paul Tremblay :
>>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-
>> exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>>
>> Independent of the python version, you have to ensure that Python on
>> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
>> adding it to the environment of the spark processes.
>>
>> Best
>>
>> Eike
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Paul Tremblay
So that means I have to pass that bash variable to the EMR clusters when I
spin them up, not afterwards. I'll give that a go.

Thanks!

Henry

On Tue, Apr 4, 2017 at 7:49 AM, Eike von Seggern 
wrote:

> 2017-04-01 21:54 GMT+02:00 Paul Tremblay :
>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-except
>> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>
> Independent of the python version, you have to ensure that Python on
> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
> adding it to the environment of the spark processes.
>
> Best
>
> Eike
>



-- 
Paul Henry Tremblay
Robert Half Technology


Re: Alternatives for dataframe collectAsList()

2017-04-03 Thread Paul Tremblay
What do you want to do with the results of the query?

Henry

On Wed, Mar 29, 2017 at 12:00 PM, szep.laszlo.it 
wrote:

> Hi,
>
> after I created a dataset
>
> Dataset df = sqlContext.sql("query");
>
> I need to have a result values and I call a method: collectAsList()
>
> List list = df.collectAsList();
>
> But it's very slow, if I work with large datasets (20-30 million records).
> I
> know, that the result isn't presented in driver app, that's why it takes
> long time, because collectAsList() collect all data from worker nodes.
>
> But then what is the right way to get result values? Is there an other
> solution to iterate over a result dataset rows, or get values? Can anyone
> post a small & working example?
>
> Thanks & Regards,
> Laszlo Szep
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Alternatives-for-dataframe-
> collectAsList-tp28547.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: Read file and represent rows as Vectors

2017-04-03 Thread Paul Tremblay
So if I am understanding your problem, you have the data in CSV files, but
the CSV files are gunzipped? If so Spark can read a gunzip file directly.
Sorry if I didn't understand your question.

Henry

On Mon, Apr 3, 2017 at 5:05 AM, Old-School 
wrote:

> I have a dataset that contains DocID, WordID and frequency (count) as shown
> below. Note that the first three numbers represent 1. the number of
> documents, 2. the number of words in the vocabulary and 3. the total number
> of words in the collection.
>
> 189
> 1430
> 12300
> 1 2 1
> 1 39 1
> 1 42 3
> 1 77 1
> 1 95 1
> 1 96 1
> 2 105 1
> 2 108 1
> 3 133 3
>
>
> What I want to do is to read the data (ignore the first three lines),
> combine the words per document and finally represent each document as a
> vector that contains the frequency of the wordID.
>
> Based on the above dataset the representation of documents 1, 2 and 3 will
> be (note that vocab_size can be extracted by the second line of the data):
>
> val data = Array(
> Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0),
> (95, 1.0), (96, 1.0))),
> Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))),
> Vectors.sparse(vocab_size, Seq((133, 3.0
>
>
> The problem is that I am not quite sure how to read the .txt.gz file as RDD
> and create an Array of sparse vectors as described above. Please note that
> I
> actually want to pass the data array in the PCA transformer.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: Looking at EMR Logs

2017-04-02 Thread Paul Tremblay
Thanks. That seems to work great, except EMR doesn't always copy the logs
to S3. The behavior  seems inconsistent and I am debugging it now.

On Fri, Mar 31, 2017 at 7:46 AM, Vadim Semenov 
wrote:

> You can provide your own log directory, where Spark log will be saved, and
> that you could replay afterwards.
>
> Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and
> run it.
> Note! The path `s3://bucket/some/directory` must exist before you run your
> job, it'll not be created automatically.
>
> The Spark HistoryServer on EMR won't show you anything because it's
> looking for logs in `hdfs:///var/log/spark/apps` by default.
>
> After that you can either copy the log files from s3 to the hdfs path
> above, or you can copy them locally to `/tmp/spark-events` (the default
> directory for spark logs) and run the history server like:
> ```
> cd /usr/local/src/spark-1.6.1-bin-hadoop2.6
> sbin/start-history-server.sh
> ```
> and then open http://localhost:18080
>
>
>
>
> On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay 
> wrote:
>
>> I am looking for tips on evaluating my Spark job after it has run.
>>
>> I know that right now I can look at the history of jobs through the web
>> ui. I also know how to look at the current resources being used by a
>> similar web ui.
>>
>> However, I would like to look at the logs after the job is finished to
>> evaluate such things as how many tasks were completed, how many executors
>> were used, etc. I currently save my logs to S3.
>>
>> Thanks!
>>
>> Henry
>>
>> --
>> Paul Henry Tremblay
>> Robert Half Technology
>>
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


bug with PYTHONHASHSEED

2017-04-01 Thread Paul Tremblay
When I try to to do a groupByKey() in my spark environment, I get the error
described here:

http://stackoverflow.com/questions/36798833/what-does-
exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh

In order to attempt to fix the problem, I set up my ipython environment
with the additional line:

PYTHONHASHSEED=1

When I fire up my ipython shell, and do:

In [7]: hash("foo")
Out[7]: -2457967226571033580

In [8]: hash("foo")
Out[8]: -2457967226571033580

So my hash function is now seeded so it returns consistent values. But when
I do a groupByKey(), I get the same error:


Exception: Randomness of hash of string should be disabled via
PYTHONHASHSEED

Anyone know how to fix this problem in python 3.4?

Thanks

Henry

-- 
Paul Henry Tremblay
Robert Half Technology


pyspark bug with PYTHONHASHSEED

2017-04-01 Thread Paul Tremblay
When I try to to do a groupByKey() in my spark environment, I get the error
described here:

http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh

In order to attempt to fix the problem, I set up my ipython environment
with the additional line:

PYTHONHASHSEED=1

When I fire up my ipython shell, and do:

In [7]: hash("foo")
Out[7]: -2457967226571033580

In [8]: hash("foo")
Out[8]: -2457967226571033580

So my hash function is now seeded so it returns consistent values. But when
I do a groupByKey(), I get the same error:


Exception: Randomness of hash of string should be disabled via
PYTHONHASHSEED

Anyone know how to fix this problem in python 3.4?

Thanks

Henry


-- 
Paul Henry Tremblay
Robert Half Technology


Looking at EMR Logs

2017-03-30 Thread Paul Tremblay
I am looking for tips on evaluating my Spark job after it has run.

I know that right now I can look at the history of jobs through the web ui.
I also know how to look at the current resources being used by a similar
web ui.

However, I would like to look at the logs after the job is finished to
evaluate such things as how many tasks were completed, how many executors
were used, etc. I currently save my logs to S3.

Thanks!

Henry

-- 
Paul Henry Tremblay
Robert Half Technology


Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-11 Thread Paul Tremblay
I've been working on this problem for several days (I am doing more to 
increase my knowledge of Spark). The code you linked to hangs because 
after reading in the file, I have to gunzip it.


Another way that seems to be working is reading each file in using 
sc.textFile, and then writing it the HDFS, and then using wholeTextFiles 
for the HDFS result.


But the bigger issue is that both methods are not executed in parallel. 
When I open my yarn manager, it shows that only one node is being used.



Henry


On 02/06/2017 03:39 PM, Jon Gregg wrote:
Strange that it's working for some directories but not others.  Looks 
like wholeTextFiles maybe doesn't work with S3? 
https://issues.apache.org/jira/browse/SPARK-4414 .


If it's possible to load the data into EMR and run Spark from there 
that may be a workaround.  This blogspot shows a python workaround 
that might work as well: 
http://michaelryanbell.com/processing-whole-files-spark-s3.html


Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <mailto:paulhtremb...@gmail.com>> wrote:


I've actually been able to trace the problem to the files being
read in. If I change to a different directory, then I don't get
the error. Is one of the executors running out of memory?





    On 02/06/2017 02:35 PM, Paul Tremblay wrote:

When I try to create an rdd using wholeTextFiles, I get an
incomprehensible error. But when I use the same path with
sc.textFile, I get no error.

I am using pyspark with spark 2.1.

in_path =

's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/

rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self,
takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc
<http://jsc.sc>(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
in __call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client,
self.target_id, self.name <http://self.name>)
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling
{0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent
failure: Lost task 0.3 in stage 1.0 (TID 7,
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal, executor
8): ExecutorLostFailure (executor 8 exited caused by one of
the running tasks) Reason: Container marked as failed:
container_1486415078210_0005_01_16 on host:
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at

org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at

org.apache.hadoop.yarn.server

Re: Turning rows into columns

2017-02-11 Thread Paul Tremblay

Yes, that's what I need. Thanks.


P.


On 02/05/2017 12:17 PM, Koert Kuipers wrote:
since there is no key to group by and assemble records i would suggest 
to write this in RDD land and then convert to data frame. you can use 
sc.wholeTextFiles to process text files and create a state machine


On Feb 4, 2017 16:25, "Paul Tremblay" <mailto:paulhtremb...@gmail.com>> wrote:


I am using pyspark 2.1 and am wondering how to convert a flat
file, with one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename:
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo:
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf
<http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
<http://1018201.vkrugudruzei.ru/blog/>',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)
<http://commoncrawl.org/faq/%29>',
 u'Accept:
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To:
',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
<http://1018201.vkrugudruzei.ru/blog/>',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'.
ward-record-id='-- 
Paul Henry Tremblay

Robert Half Technology






Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-06 Thread Paul Tremblay
I've actually been able to trace the problem to the files being read in. 
If I change to a different directory, then I don't get the error. Is one 
of the executors running out of memory?





On 02/06/2017 02:35 PM, Paul Tremblay wrote:
When I try to create an rdd using wholeTextFiles, I get an 
incomprehensible error. But when I use the same path with sc.textFile, 
I get no error.


I am using pyspark with spark 2.1.

in_path = 
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/


rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned + 
numPartsToTry, totalParts))

-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
partitionFunc, partitions, allowLocal)

963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))

967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, 
self.name)

   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

317 raise Py4JJavaError(
318 "An error occurred while calling 
{0}{1}{2}.\n".

--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
executor 8): ExecutorLostFailure (executor 8 exited caused by one of 
the running tasks) Reason: Container marked as failed: 
container_1486415078210_0005_01_16 on host: 
ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
Diagnostics: Exception from container-launch.

Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

rdd = sc.textFile(in_path)

In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



wholeTextFiles fails, but textFile succeeds for same path

2017-02-06 Thread Paul Tremblay
When I try to create an rdd using wholeTextFiles, I get an 
incomprehensible error. But when I use the same path with sc.textFile, I 
get no error.


I am using pyspark with spark 2.1.

in_path = 
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/


rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned + 
numPartsToTry, totalParts))

-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
partitionFunc, partitions, allowLocal)

963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))

967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
executor 8): ExecutorLostFailure (executor 8 exited caused by one of the 
running tasks) Reason: Container marked as failed: 
container_1486415078210_0005_01_16 on host: 
ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
Diagnostics: Exception from container-launch.

Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

rdd = sc.textFile(in_path)

In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Turning rows into columns

2017-02-04 Thread Paul Tremblay
I am using pyspark 2.1 and am wondering how to convert a flat file, with
one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename:
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo:
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)',
 u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'.
ward-record-id='