What Davies said is correct, second argument is hadoop's output format.
Hadoop supports many type of output format's and all of them have their own
advantages. Apart from the one specified above,
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.html
is one such formatter class.


thanks,

Prashant Sharma



On Wed, Apr 27, 2016 at 5:22 AM, Davies Liu <dav...@databricks.com> wrote:

> hdfs://192.168.10.130:9000/dev/output/test already exists, so you need
> to remove it first.
>
> On Tue, Apr 26, 2016 at 5:28 AM, Luke Adolph <kenan3...@gmail.com> wrote:
> > Hi, all:
> > Below is my code:
> >
> > from pyspark import *
> > import re
> >
> > def getDateByLine(input_str):
> >     str_pattern = '^\d{4}-\d{2}-\d{2}'
> >     pattern = re.compile(str_pattern)
> >     match = pattern.match(input_str)
> >     if match:
> >         return match.group()
> >     else:
> >         return None
> >
> > file_url = "hdfs://192.168.10.130:9000/dev/test/test.log"
> > input_file = sc.textFile(file_url)
> > line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1))
> > counts = line.reduceByKey(lambda a,b: a+b)
> > print counts.collect()
> > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\
> >
>  "org.apache.hadoop.mapred.SequenceFileOutputFormat")
> >
> >
> > What I confused is the method saveAsHadoopFile,I have read the pyspark
> API,
> > But I still don’t understand the second arg mean
> >
> > Below is the output when I run above code:
> > ```
> >
> > [(u'2016-02-29', 99), (u'2016-03-02', 30)]
> >
> >
> ---------------------------------------------------------------------------
> > Py4JJavaError                             Traceback (most recent call
> last)
> > <ipython-input-6-0a091f21bf99> in <module>()
> >      18 counts = line.reduceByKey(lambda a,b: a+b)
> >      19 print counts.collect()
> > ---> 20
> > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",
> > "org.apache.hadoop.mapred.SequenceFileOutputFormat")
> >
> > /mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in
> > saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass,
> > keyConverter, valueConverter, conf, compressionCodecClass)
> >    1419                                                  keyClass,
> > valueClass,
> >    1420                                                  keyConverter,
> > valueConverter,
> > -> 1421                                                  jconf,
> > compressionCodecClass)
> >    1422
> >    1423     def saveAsSequenceFile(self, path,
> compressionCodecClass=None):
> >
> >
> /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
> > in __call__(self, *args)
> >     811         answer = self.gateway_client.send_command(command)
> >     812         return_value = get_return_value(
> > --> 813             answer, self.gateway_client, self.target_id,
> self.name)
> >     814
> >     815         for temp_arg in temp_args:
> >
> > /mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a,
> **kw)
> >      43     def deco(*a, **kw):
> >      44         try:
> > ---> 45             return f(*a, **kw)
> >      46         except py4j.protocol.Py4JJavaError as e:
> >      47             s = e.java_exception.toString()
> >
> >
> /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py
> > in get_return_value(answer, gateway_client, target_id, name)
> >     306                 raise Py4JJavaError(
> >     307                     "An error occurred while calling
> {0}{1}{2}.\n".
> > --> 308                     format(target_id, ".", name), value)
> >     309             else:
> >     310                 raise Py4JError(
> >
> > Py4JJavaError: An error occurred while calling
> > z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile.
> > : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory
> > hdfs://192.168.10.130:9000/dev/output/test already exists
> >       at
> >
> org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> >       at
> >
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> >       at
> >
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> >       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
> >       at
> >
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> >       at
> >
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> >       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> >       at
> >
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
> >       at
> >
> org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753)
> >       at
> org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >       at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >       at java.lang.reflect.Method.invoke(Method.java:483)
> >       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> >       at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
> >       at py4j.Gateway.invoke(Gateway.java:259)
> >       at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> >       at py4j.commands.CallCommand.execute(CallCommand.java:79)
> >       at py4j.GatewayConnection.run(GatewayConnection.java:209)
> >       at java.lang.Thread.run(Thread.java:745)
> >
> > ```
> >
> > Can you help me, Thanks !
> > --
> > Thanks & Best Regards
> > 卢文泉 | Adolph Lu
> > TEL:+86 15651006559
> > Linker Networks(http://www.linkernetworks.com/)
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to