Hi Mekal, thanks for wanting to help. I have attached the python script as
well as the different exceptions here. I have also pasted the cluster
exception below so I can highlight the relevant parts.

[abosede2@badboy ~]$ spark-submit --master spark://10.160.5.48:7077
trade_data_count.py
Ivy Default Cache set to: /home/abosede2/.ivy2/cache
The jars for the packages stored in: /home/abosede2/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark-1.6.
1/assembly/target/scala-2.11/spark-assembly-1.6.1-hre/
settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.databricks#spark-csv_2.11;1.3.0 in central
        found org.apache.commons#commons-csv;1.1 in central
        found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 160ms :: artifacts dl 7ms
        :: modules in use:
        com.databricks#spark-csv_2.11;1.3.0 from central in [default]
        com.univocity#univocity-parsers;1.5.1 from central in [default]
        org.apache.commons#commons-csv;1.1 from central in [default]
        ------------------------------------------------------------
---------
        |                  |            modules            ||   artifacts
|
        |       conf       | number| search|dwnlded|evicted||
number|dwnlded|
        ------------------------------------------------------------
---------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0
|
        ------------------------------------------------------------
---------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/6ms)
[Stage 0:========================>                              (104 +
8) /235]16/10/15
19:42:37 ERROR TaskScadboy.win.ad.jhu.edu
<http://taskscadboy.win.ad.jhu.edu/>: Remote RPC client disassociated.
Likely due to containers exceeding thresholds, or netwoWARN messages.
16/10/15 19:42:37 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: Master removed our a
[Stage 0:=======================>                             (104 + -28) /
235]Traceback (most recent call la
  File "/home/abosede2/trade_data_count.py", line 79, in <module>
    print("Raw data is %d rows." % data.count())
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 269, in count
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/java_gateway.py",
line 836, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 45, in deco
  File "/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/protocol.py",
line 310, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o6867.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Master
removed our application: FAILED
        at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$
scheduler$DAGScheduler$$failJobAndIndepend)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$
abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$
abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(
ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(
DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:799
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:799
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(
DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at org.apache.spark.sql.execution.SparkPlan.
executeCollect(SparkPlan.scala:166)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(
SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$
spark$sql$DataFrame$$execute$1$1.apply(DataFrame
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$
spark$sql$DataFrame$$execute$1$1.apply(DataFrame
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
SQLExecution.scala:56)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(
DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org
<http://org.apache.spark.sql.dataframe.org/>$apache$spark$
sql$DataFrame$$execute$1(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org
<http://org.apache.spark.sql.dataframe.org/>$apache$spark$
sql$DataFrame$$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$$anonfun$count$1.
apply(DataFrame.scala:1515)
        at org.apache.spark.sql.DataFrame$$anonfun$count$1.
apply(DataFrame.scala:1514)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1514)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(
ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Unknown Source)

On Sat, Oct 15, 2016 at 10:06 PM, Mekal Zheng <mekal.zh...@gmail.com> wrote:

> Show me your code
>
>
> 2016年10月16日 +0800 08:24 hxfeng <980548...@qq.com>,写道:
>
> show you pi.py code and what is  the exception message?
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Tobi Bosede";<ani.to...@gmail.com>;
> *发送时间:* 2016年10月16日(星期天) 上午8:04
> *收件人:* "user"<user@spark.apache.org>;
> *主题:* Spark-submit Problems
>
> Hi everyone,
>
> I am having problems submitting an app through spark-submit when the
> master is not "local". However the pi.py example which comes with Spark
> works with any master. I believe my script has the same structure as pi.py,
> but for some reason my script is not as flexible. Specifically, the failure
> occurs when count() is called. Count is the first action in the script.
> Also, Spark complains that is is losing executors however, interactively in
> Jupyter, everything works perfectly with any master passed to spark conf.
>
> Does anyone know what might be happening? Is there anywhere I can look up
> the requirements for spark-submit scripts?
>
> Thanks,
> Tobi
>
>
[abosede2@badboy ~]$ spark-submit --master spark://10.160.5.48:7077 
trade_data_count.py
Ivy Default Cache set to: /home/abosede2/.ivy2/cache
The jars for the packages stored in: /home/abosede2/.ivy2/jars
:: loading settings :: url = 
jar:file:/usr/local/spark-1.6.1/assembly/target/scala-2.11/spark-assembly-1.6.1-hre/settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.databricks#spark-csv_2.11;1.3.0 in central
        found org.apache.commons#commons-csv;1.1 in central
        found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 160ms :: artifacts dl 7ms
        :: modules in use:
        com.databricks#spark-csv_2.11;1.3.0 from central in [default]
        com.univocity#univocity-parsers;1.5.1 from central in [default]
        org.apache.commons#commons-csv;1.1 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/6ms)
[Stage 0:========================>                              (104 + 8) / 
235]16/10/15 19:42:37 ERROR TaskScadboy.win.ad.jhu.edu: Remote RPC client 
disassociated. Likely due to containers exceeding thresholds, or netwoWARN 
messages.
16/10/15 19:42:37 ERROR SparkDeploySchedulerBackend: Application has been 
killed. Reason: Master removed our a
[Stage 0:=======================>                             (104 + -28) / 
235]Traceback (most recent call la
  File "/home/abosede2/trade_data_count.py", line 79, in <module>
    print("Raw data is %d rows." % data.count())
  File 
"/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 
269, in count
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/java_gateway.py", 
line 836, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 45, in deco
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/protocol.py", line 
310, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o6867.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Master 
removed our application: FAILED
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndepend)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame
        at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at 
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
        at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
        at 
org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
        at 
org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1514)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1514)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Unknown Source)
[abosede2@badboy ~]$ spark-submit --master local[16] trade_data_count.py
Ivy Default Cache set to: /home/abosede2/.ivy2/cache
The jars for the packages stored in: /home/abosede2/.ivy2/jars
:: loading settings :: url = 
jar:file:/usr/local/spark-1.6.1/assembly/target/scala-2.11/spark-assembly-1.6.1-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.databricks#spark-csv_2.11;1.3.0 in central
        found org.apache.commons#commons-csv;1.1 in central
        found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 162ms :: artifacts dl 8ms
        :: modules in use:
        com.databricks#spark-csv_2.11;1.3.0 from central in [default]
        com.univocity#univocity-parsers;1.5.1 from central in [default]
        org.apache.commons#commons-csv;1.1 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/7ms)
[Stage 0:>                                                       (1 + 16) / 
235]^CTraceback (most recent call last):
  File "/home/abosede2/trade_data_count.py", line 80, in <module>
    print("Raw data is %d rows." % data.count())
  File 
"/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 
269, in count
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/java_gateway.py", 
line 834, in __call__
    answer = self.gateway_client.send_command(command)
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/java_gateway.py", 
line 648, in send_command
    response = connection.send_command(command)
  File 
"/usr/lib/python2.7/site-packages/py4j-0.9.2-py2.7.egg/py4j/java_gateway.py", 
line 763, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python2.7/socket.py", line 430, in readline
    data = recv(1)
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/context.py", line 
225, in signal_handler
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
    raise EOFError
    raise EOFError
EOFError
EOFError
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
    raise EOFError
EOFError
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    raise EOFError
EOFError
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
Traceback (most recent call last):
EOFError
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
157, in manager
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 
61, in worker
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 
136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", 
line 545, in read_int
    raise EOFError
EOFError
16/10/15 19:35:43 WARN QueuedThreadPool: 16 threads could not be stopped
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@73033d30,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@27c61fb,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@16a95ce1,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@1396f6c8,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@7916b337,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@18046657,null)
16/10/15 19:35:43 ERROR LiveListenerBus: SparkListenerBus has already stopped! 
Dropping event 
SparkListenerTaskEnd(0,0,ShuffleMapTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@44166fe2,null)
# coding: utf-8

# In[1]:
import numpy as np
import pandas as pd
from os import listdir

from pyspark import SparkContext
from pyspark.sql import SQLContext, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

if __name__ == "__main__":

    sc = SparkContext(appName="TradeDataCount")
    sqlContext = SQLContext(sc)


# In[3]:

    files = [f for f in listdir('/home/dnaiman1/findata/big_harvest') if 
'Trades' in f]
    paths=['/home/dnaiman1/findata/big_harvest/%s' % i for i in files]


# In[4]:

    def parseData(file_path):
        customSchema = StructType([     StructField("ProductName", 
StringType(), True),     StructField("Maturity", StringType(), True),     
StructField("Date", StringType(), True),     StructField("Time", StringType(), 
True),     StructField("Price", DoubleType(), True),     
StructField("Quantity", IntegerType(), True)])
    
        df = (sqlContext
            .read.format('com.databricks.spark.csv')
            .options(header='false')
            .load(file_path, schema=customSchema))
    
        def maturityConv(date):
            elements=date.split("/")
            return "%s-%s-%s" % (elements[2], elements[0], elements[1])
    
        maturityConvUDF=udf(maturityConv, StringType())
    
        def dateConv(date):
            elements=date.split(":")
            return "%s-%s-%s" % (elements[0], elements[1], elements[2])

        dateConvUDF=udf(dateConv, StringType())
    
        def timeConv(time):
            return time[0:-4]

        timeConvUDF=udf(timeConv, StringType())
    
        df2=(df
            .withColumn("MaturityConv", maturityConvUDF(df.Maturity))
            .withColumn("DateConv", dateConvUDF(df.Date))
            .withColumn("TimeConv", timeConvUDF(df.Time)))
    
        df3=(df2
            .select(['ProductName', 
col('MaturityConv').cast('date').alias('Maturity'), 
col('DateConv').cast('date').alias('Date'), concat_ws(' ', df2.DateConv, 
df2.TimeConv).alias('TimeStamp').cast('timestamp'), 'Price', 'Quantity'])) #can 
also use to_date('MaturityConv').alias('Maturity')
    
        return df3


# In[5]:

    parsed_data=[]
    for file in paths[8:]:
        parsed_data.append(parseData(file))


# In[6]:

    def union(data_list):
        unionDF=data_list[0].unionAll(data_list[1])
        for df in data_list[2:]:
            unionDF=unionDF.unionAll(df)
        return unionDF
    
    data=union(parsed_data).cache() 
    print("Raw data is %d rows." % data.count())


# In[7]:

    max_date=data.select(max('Date')).first()
    print("Max date is %s." % max_date)


# In[9]:

    data=data.select(['ProductName', 'Maturity', 'Date', 'TimeStamp', 
hour("TimeStamp").alias("Hour"), 'Price', 'Quantity'])


# In[14]:

    pivot=(data
        .groupBy("ProductName", "Maturity", "Date")     
        .pivot("Hour")
        .agg(sum("Quantity"))
        .orderBy("ProductName", "Maturity", "Date")       
        .na.fill(0)
        .cache())
  
    print("Pivoted data is %d rows." % pivot.count())


# In[15]:

    pivotDF=pivot.toPandas()
    pivotDF['DayTotal'] = pivotDF.sum(axis=1)
    pivotDF.to_csv("trades_pivot_%s.txt" % max_date, sep='\t', index=False)
    print("File trades_pivot_%s.txt written to disk." % max_date)


# In[16]:

    pivot.registerTempTable("Trades")
    productList = sqlContext.sql("select distinct ProductName, Maturity from 
Trades group by ProductName, Maturity order by ProductName, Maturity")
    print("Number of unique products (instrument/maturity pairs) are %d." % 
productList.count())


# In[17]:

    products=productList.toPandas()
    products.to_csv("product_list_%s.txt" % max_date, sep="\t", index=False, 
header=True) 
    print("File product_list_%s.txt written to disk." % max_date)


# In[18]:

    sc.stop()
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to