I have a large series of atomic facts as JSON in a SequenceFile (Timestamp -> JSON).
Each fact looks like: { "asserted_at": "2014-02-13T16:36:33+00:00", "subject": "load_managers/9393cec2-f77b-4e6e-850f-d0ff53eedc82", "property": "price_specification", "value": {"currency": "USD", "amount": 43}, "nonce": "C2PrgY6YF3CwQXAuvIlSJw==" } The goal is a collated set of property to value mappings for each subject, preferring a newer fact over an older one if there are duplicates for a given property, in some format like: {"subject": "load_managers/9393cec2-f77b-4e6e-850f-d0ff53eedc82", "price_specification": {"currency":.....}, "manages": "newest-nodes/1d74426f-2b0a-4777-ac1b-042268cab09c" } I've gotten a significant portion of the way along. The script is below, but I've basically gotten the data I want, but can't for the life of me figure out how to render it back to the format I want. I've gotten to a point where for each subject, I have a bag of tuples with the newest facts in it (ie, throwing away the older copy of duplicates) that looks like: {(price_specification,[amount#49,currency#USD]),(manages,newest-nodes/1d74426f-2b0a-4777-ac1b-042268cab09c)} One hurdle as well is that I need to maintain support for the nested maps in the value fields (or nested bags, or... whatever as long as the end result is right). *How do I recombine that into the format I want? Do I need a UDF for this? *If so, I'm going to need a bit of hand holding to figure out the Pig UDF API and how to package it up for use. REGISTER 'rubyudf.rb' using jruby as json; REGISTER /home/vagrant/pig/trunk/contrib/piggybank/java/piggybank.jar; DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader(); REGISTER 'rubyudf.rb' using jruby AS json; REGISTER 'bagToMap.py' using jython AS myfuncs; REGISTER json-simple-1.1.jar; REGISTER pig-to-json.jar; facts = LOAD 'hdfs://localhost:9000/flume/FlumeData.1392309393792' USING SequenceFileLoader AS (timestamp:long, jsonText:chararray); jsons = FOREACH facts { hash = json.load(jsonText); GENERATE hash#'asserted_at' AS asserted_at:chararray, hash#'subject' AS subject:chararray, hash#'property' AS property:chararray, hash#'value' AS value:map[]; } bySubjectAndProperty = GROUP jsons by (subject, property); newestFacts = FOREACH bySubjectAndProperty { sorted = ORDER jsons by asserted_at desc; singleNewFact = limit sorted 1; GENERATE flatten(singleNewFact); } upToDateFactsBySubject = GROUP newestFacts by subject; entitiesPropValTuples = FOREACH upToDateFactsBySubject { x = FOREACH newestFacts GENERATE property, value; GENERATE x; } ILLUSTRATE entitiesPropValTuples; --mapped = FOREACH entitiesPropValTuples GENERATE myfuncs.BagtoMap(x) AS y; --jsoned = FOREACH entitiesPropValTuples GENERATE com.hortonworks.pig.udf.ToJson(x) AS final:chararray; --jsoned = FOREACH mapped GENERATE json.dump(y); limited = LIMIT entitiesPropValTuples 100; STORE limited INTO 'pigoutput32'; -- ILLUSTRATE jsoned; *THANKS!* *- Chris*