Hi, all:
Below is my code:

from pyspark import *import re
def getDateByLine(input_str):
    str_pattern = '^\d{4}-\d{2}-\d{2}'
    pattern = re.compile(str_pattern)
    match = pattern.match(input_str)
    if match:
        return match.group()
    else:
        return None

file_url = "hdfs://192.168.10.130:9000/dev/test/test.log"
input_file = sc.textFile(file_url)
line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1))
counts = line.reduceByKey(lambda a,b: a+b)print counts.collect()
counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\
                        "org.apache.hadoop.mapred.SequenceFileOutputFormat")

​

What I confused is the method *saveAsHadoopFile*,I have read the pyspark
API, But I still don’t understand the second arg mean

Below is the output when I run above code:
```

[(u'2016-02-29', 99), (u'2016-03-02', 30)]

---------------------------------------------------------------------------Py4JJavaError
                            Traceback (most recent call
last)<ipython-input-6-0a091f21bf99> in <module>()     18 counts =
line.reduceByKey(lambda a,b: a+b)     19 print counts.collect()---> 20
counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",

"org.apache.hadoop.mapred.SequenceFileOutputFormat")

/mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in
saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass,
keyConverter, valueConverter, conf, compressionCodecClass)   1419
                                            keyClass, valueClass,
1420                                                  keyConverter,
valueConverter,-> 1421
 jconf, compressionCodecClass)   1422
   1423     def saveAsSequenceFile(self, path, compressionCodecClass=None):

/mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
in __call__(self, *args)    811         answer =
self.gateway_client.send_command(command)    812         return_value
= get_return_value(--> 813             answer, self.gateway_client,
self.target_id, self.name)    814
    815         for temp_arg in temp_args:

/mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a,
**kw)     43     def deco(*a, **kw):     44         try:---> 45
     return f(*a, **kw)     46         except
py4j.protocol.Py4JJavaError as e:     47             s =
e.java_exception.toString()
/mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)    306
             raise Py4JJavaError(    307                     "An error
occurred while calling {0}{1}{2}.\n".--> 308
format(target_id, ".", name), value)    309             else:
    310                 raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output
directory hdfs://192.168.10.130:9000/dev/output/test already exists
        at 
org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
        at 
org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753)
        at 
org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

```

Can you help me, Thanks !
-- 
Thanks & Best Regards
卢文泉 | Adolph Lu
TEL:+86 15651006559
Linker Networks(http://www.linkernetworks.com/)

Reply via email to