What Davies said is correct, second argument is hadoop's output format. Hadoop supports many type of output format's and all of them have their own advantages. Apart from the one specified above, https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.html is one such formatter class.
thanks, Prashant Sharma On Wed, Apr 27, 2016 at 5:22 AM, Davies Liu <dav...@databricks.com> wrote: > hdfs://192.168.10.130:9000/dev/output/test already exists, so you need > to remove it first. > > On Tue, Apr 26, 2016 at 5:28 AM, Luke Adolph <kenan3...@gmail.com> wrote: > > Hi, all: > > Below is my code: > > > > from pyspark import * > > import re > > > > def getDateByLine(input_str): > > str_pattern = '^\d{4}-\d{2}-\d{2}' > > pattern = re.compile(str_pattern) > > match = pattern.match(input_str) > > if match: > > return match.group() > > else: > > return None > > > > file_url = "hdfs://192.168.10.130:9000/dev/test/test.log" > > input_file = sc.textFile(file_url) > > line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1)) > > counts = line.reduceByKey(lambda a,b: a+b) > > print counts.collect() > > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\ > > > "org.apache.hadoop.mapred.SequenceFileOutputFormat") > > > > > > What I confused is the method saveAsHadoopFile,I have read the pyspark > API, > > But I still don’t understand the second arg mean > > > > Below is the output when I run above code: > > ``` > > > > [(u'2016-02-29', 99), (u'2016-03-02', 30)] > > > > > --------------------------------------------------------------------------- > > Py4JJavaError Traceback (most recent call > last) > > <ipython-input-6-0a091f21bf99> in <module>() > > 18 counts = line.reduceByKey(lambda a,b: a+b) > > 19 print counts.collect() > > ---> 20 > > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test", > > "org.apache.hadoop.mapred.SequenceFileOutputFormat") > > > > /mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in > > saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass, > > keyConverter, valueConverter, conf, compressionCodecClass) > > 1419 keyClass, > > valueClass, > > 1420 keyConverter, > > valueConverter, > > -> 1421 jconf, > > compressionCodecClass) > > 1422 > > 1423 def saveAsSequenceFile(self, path, > compressionCodecClass=None): > > > > > /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py > > in __call__(self, *args) > > 811 answer = self.gateway_client.send_command(command) > > 812 return_value = get_return_value( > > --> 813 answer, self.gateway_client, self.target_id, > self.name) > > 814 > > 815 for temp_arg in temp_args: > > > > /mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a, > **kw) > > 43 def deco(*a, **kw): > > 44 try: > > ---> 45 return f(*a, **kw) > > 46 except py4j.protocol.Py4JJavaError as e: > > 47 s = e.java_exception.toString() > > > > > /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py > > in get_return_value(answer, gateway_client, target_id, name) > > 306 raise Py4JJavaError( > > 307 "An error occurred while calling > {0}{1}{2}.\n". > > --> 308 format(target_id, ".", name), value) > > 309 else: > > 310 raise Py4JError( > > > > Py4JJavaError: An error occurred while calling > > z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile. > > : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory > > hdfs://192.168.10.130:9000/dev/output/test already exists > > at > > > org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) > > at > > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > > at > > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > > at > > > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) > > at > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) > > at > > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > > at > > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > > at > > > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026) > > at > > > org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753) > > at > org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:483) > > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) > > at py4j.Gateway.invoke(Gateway.java:259) > > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > > at py4j.commands.CallCommand.execute(CallCommand.java:79) > > at py4j.GatewayConnection.run(GatewayConnection.java:209) > > at java.lang.Thread.run(Thread.java:745) > > > > ``` > > > > Can you help me, Thanks ! > > -- > > Thanks & Best Regards > > 卢文泉 | Adolph Lu > > TEL:+86 15651006559 > > Linker Networks(http://www.linkernetworks.com/) > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >