hdfs://192.168.10.130:9000/dev/output/test already exists, so you need to remove it first.
On Tue, Apr 26, 2016 at 5:28 AM, Luke Adolph <kenan3...@gmail.com> wrote: > Hi, all: > Below is my code: > > from pyspark import * > import re > > def getDateByLine(input_str): > str_pattern = '^\d{4}-\d{2}-\d{2}' > pattern = re.compile(str_pattern) > match = pattern.match(input_str) > if match: > return match.group() > else: > return None > > file_url = "hdfs://192.168.10.130:9000/dev/test/test.log" > input_file = sc.textFile(file_url) > line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1)) > counts = line.reduceByKey(lambda a,b: a+b) > print counts.collect() > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\ > "org.apache.hadoop.mapred.SequenceFileOutputFormat") > > > What I confused is the method saveAsHadoopFile,I have read the pyspark API, > But I still don’t understand the second arg mean > > Below is the output when I run above code: > ``` > > [(u'2016-02-29', 99), (u'2016-03-02', 30)] > > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-6-0a091f21bf99> in <module>() > 18 counts = line.reduceByKey(lambda a,b: a+b) > 19 print counts.collect() > ---> 20 > counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test", > "org.apache.hadoop.mapred.SequenceFileOutputFormat") > > /mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in > saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass, > keyConverter, valueConverter, conf, compressionCodecClass) > 1419 keyClass, > valueClass, > 1420 keyConverter, > valueConverter, > -> 1421 jconf, > compressionCodecClass) > 1422 > 1423 def saveAsSequenceFile(self, path, compressionCodecClass=None): > > /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 811 answer = self.gateway_client.send_command(command) > 812 return_value = get_return_value( > --> 813 answer, self.gateway_client, self.target_id, self.name) > 814 > 815 for temp_arg in temp_args: > > /mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a, **kw) > 43 def deco(*a, **kw): > 44 try: > ---> 45 return f(*a, **kw) > 46 except py4j.protocol.Py4JJavaError as e: > 47 s = e.java_exception.toString() > > /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 306 raise Py4JJavaError( > 307 "An error occurred while calling {0}{1}{2}.\n". > --> 308 format(target_id, ".", name), value) > 309 else: > 310 raise Py4JError( > > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile. > : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory > hdfs://192.168.10.130:9000/dev/output/test already exists > at > org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026) > at > org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753) > at > org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:209) > at java.lang.Thread.run(Thread.java:745) > > ``` > > Can you help me, Thanks ! > -- > Thanks & Best Regards > 卢文泉 | Adolph Lu > TEL:+86 15651006559 > Linker Networks(http://www.linkernetworks.com/) --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org