Anchit, please ignore my inputs. you are right. Thanks.
> On Sep 26, 2015, at 17:27, Fengdong Yu <fengdo...@everstring.com> wrote: > > Hi Anchit, > > this is not my expected, because you specified the HDFS directory in your > code. > I've solved like this: > > val text = sc.hadoopFile(Args.input, > classOf[TextInputFormat], classOf[LongWritable], > classOf[Text], 2) > val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]] > > hadoopRdd.mapPartitionsWithInputSplit((inputSplit, iterator) => { > val file = inputSplit.asInstanceOf[FileSplit] > terator.map ( tp => {tp._1, new Text(file.toString + “,” + > tp._2.toString)}) > } > > > > >> On Sep 25, 2015, at 13:12, Anchit Choudhry <anchit.choud...@gmail.com >> <mailto:anchit.choud...@gmail.com>> wrote: >> >> Hi Fengdong, >> >> So I created two files in HDFS under a test folder. >> >> test/dt=20100101.json >> { "key1" : "value1" } >> >> test/dt=20100102.json >> { "key2" : "value2" } >> >> Then inside PySpark shell >> >> rdd = sc.wholeTextFiles('./test/*') >> rdd.collect() >> [(u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', u'{ "key1" : >> "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json', >> u'{ "key2" : "value2" })] >> import json >> def editMe(y, x): >> j = json.loads(y) >> j['source'] = x >> return j >> >> rdd.map(lambda (x,y): editMe(y,x)).collect() >> [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', >> u'key1': u'value1'}, {u'key2': u'value2', 'source': >> u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json'}] >> >> Similarly you could modify the function to return 'source' and 'date' with >> some string manipulation per your requirements. >> >> Let me know if this helps. >> >> Thanks, >> Anchit >> >> >> On 24 September 2015 at 23:55, Fengdong Yu <fengdo...@everstring.com >> <mailto:fengdo...@everstring.com>> wrote: >> >> yes. such as I have two data sets: >> >> date set A: /data/test1/dt=20100101 >> data set B: /data/test2/dt=20100202 >> >> >> all data has the same JSON format , such as: >> {“key1” : “value1”, “key2” : “value2” } >> >> >> my output expected: >> {“key1” : “value1”, “key2” : “value2” , “source” : “test1”, “date” : >> “20100101"} >> {“key1” : “value1”, “key2” : “value2” , “source” : “test2”, “date” : >> “20100202"} >> >> >>> On Sep 25, 2015, at 11:52, Anchit Choudhry <anchit.choud...@gmail.com >>> <mailto:anchit.choud...@gmail.com>> wrote: >>> >>> Sure. May I ask for a sample input(could be just few lines) and the output >>> you are expecting to bring clarity to my thoughts? >>> >>> On Thu, Sep 24, 2015, 23:44 Fengdong Yu <fengdo...@everstring.com >>> <mailto:fengdo...@everstring.com>> wrote: >>> Hi Anchit, >>> >>> Thanks for the quick answer. >>> >>> my exact question is : I want to add HDFS location into each line in my >>> JSON data. >>> >>> >>> >>>> On Sep 25, 2015, at 11:25, Anchit Choudhry <anchit.choud...@gmail.com >>>> <mailto:anchit.choud...@gmail.com>> wrote: >>>> >>>> Hi Fengdong, >>>> >>>> Thanks for your question. >>>> >>>> Spark already has a function called wholeTextFiles within sparkContext >>>> which can help you with that: >>>> >>>> Python >>>> hdfs://a-hdfs-path/part-00000 >>>> hdfs://a-hdfs-path/part-00001 >>>> ... >>>> hdfs://a-hdfs-path/part-nnnnn >>>> rdd = sparkContext.wholeTextFiles(“hdfs://a- <>hdfs-path”) >>>> (a-hdfs-path/part-00000, its content) >>>> (a-hdfs-path/part-00001, its content) >>>> ... >>>> (a-hdfs-path/part-nnnnn, its content) >>>> More info: http://spark >>>> <http://spark/>.apache.org/docs/latest/api/python/pyspark.html?highlight=wholetext#pyspark.SparkContext.wholeTextFiles >>>> >>>> ------------ >>>> >>>> Scala >>>> >>>> val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path") >>>> >>>> More info: >>>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)] >>>> >>>> Let us know if this helps or you need more help. >>>> >>>> Thanks, >>>> Anchit Choudhry >>>> >>>> On 24 September 2015 at 23:12, Fengdong Yu <fengdo...@everstring.com >>>> <mailto:fengdo...@everstring.com>> wrote: >>>> Hi, >>>> >>>> I have multiple files with JSON format, such as: >>>> >>>> /data/test1_data/sub100/test.data >>>> /data/test2_data/sub200/test.data >>>> >>>> >>>> I can sc.textFile(“/data/*/*”) >>>> >>>> but I want to add the {“source” : “HDFS_LOCATION”} to each line, then save >>>> it the one target HDFS location. >>>> >>>> how to do it, Thanks. >>>> >>>> >>>> >>>> >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>> <mailto:dev-unsubscr...@spark.apache.org> >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>>> <mailto:dev-h...@spark.apache.org> >>>> >>>> >>> >> >> >