Hi, all: Below is my code: from pyspark import *import re def getDateByLine(input_str): str_pattern = '^\d{4}-\d{2}-\d{2}' pattern = re.compile(str_pattern) match = pattern.match(input_str) if match: return match.group() else: return None
file_url = "hdfs://192.168.10.130:9000/dev/test/test.log" input_file = sc.textFile(file_url) line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1)) counts = line.reduceByKey(lambda a,b: a+b)print counts.collect() counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\ "org.apache.hadoop.mapred.SequenceFileOutputFormat") What I confused is the method *saveAsHadoopFile*,I have read the pyspark API, But I still don’t understand the second arg mean Below is the output when I run above code: ``` [(u'2016-02-29', 99), (u'2016-03-02', 30)] ---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last)<ipython-input-6-0a091f21bf99> in <module>() 18 counts = line.reduceByKey(lambda a,b: a+b) 19 print counts.collect()---> 20 counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test", "org.apache.hadoop.mapred.SequenceFileOutputFormat") /mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, compressionCodecClass) 1419 keyClass, valueClass, 1420 keyConverter, valueConverter,-> 1421 jconf, compressionCodecClass) 1422 1423 def saveAsSequenceFile(self, path, compressionCodecClass=None): /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value(--> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 for temp_arg in temp_args: /mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a, **kw) 43 def deco(*a, **kw): 44 try:---> 45 return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e: 47 s = e.java_exception.toString() /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 306 raise Py4JJavaError( 307 "An error occurred while calling {0}{1}{2}.\n".--> 308 format(target_id, ".", name), value) 309 else: 310 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile. : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.10.130:9000/dev/output/test already exists at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026) at org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753) at org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) ``` Can you help me, Thanks ! -- Thanks & Best Regards 卢文泉 | Adolph Lu TEL:+86 15651006559 Linker Networks(http://www.linkernetworks.com/)