I have a large series of atomic facts as JSON in a SequenceFile (Timestamp
-> JSON).

Each fact looks like:

{ "asserted_at": "2014-02-13T16:36:33+00:00", "subject":
"load_managers/9393cec2-f77b-4e6e-850f-d0ff53eedc82", "property":
"price_specification", "value": {"currency": "USD", "amount": 43}, "nonce":
"C2PrgY6YF3CwQXAuvIlSJw==" }

The goal is a collated set of property to value mappings for each subject,
preferring a newer fact over an older one if there are duplicates for a
given property, in some format like:

{"subject": "load_managers/9393cec2-f77b-4e6e-850f-d0ff53eedc82",
"price_specification": {"currency":.....}, "manages":
"newest-nodes/1d74426f-2b0a-4777-ac1b-042268cab09c" }

I've gotten a significant portion of the way along.  The script is below,
but I've basically gotten the data I want, but can't for the life of me
figure out how to render it back to the format I want.


I've gotten to a point where for each subject, I have a bag of tuples with
the newest facts in it (ie, throwing away the older copy of duplicates)
that looks like:

{(price_specification,[amount#49,currency#USD]),(manages,newest-nodes/1d74426f-2b0a-4777-ac1b-042268cab09c)}

One hurdle as well is that I need to maintain support for the nested maps
in the value fields (or nested bags, or... whatever as long as the end
result is right).

*How do I recombine that into the format I want?  Do I need a UDF for this?
*If so, I'm going to need a bit of hand holding to figure out the Pig UDF
API and how to package it up for use.


REGISTER 'rubyudf.rb' using jruby as json;
REGISTER /home/vagrant/pig/trunk/contrib/piggybank/java/piggybank.jar;
DEFINE SequenceFileLoader
org.apache.pig.piggybank.storage.SequenceFileLoader();

REGISTER 'rubyudf.rb' using jruby AS json;
REGISTER 'bagToMap.py' using jython AS myfuncs;
REGISTER json-simple-1.1.jar;
REGISTER pig-to-json.jar;

facts = LOAD 'hdfs://localhost:9000/flume/FlumeData.1392309393792' USING
SequenceFileLoader AS (timestamp:long, jsonText:chararray);

jsons = FOREACH facts {
  hash = json.load(jsonText);
  GENERATE hash#'asserted_at'        AS asserted_at:chararray,
           hash#'subject'            AS subject:chararray,
           hash#'property'           AS property:chararray,
           hash#'value'              AS value:map[];
  }

bySubjectAndProperty = GROUP jsons by (subject, property);

newestFacts = FOREACH bySubjectAndProperty {
  sorted = ORDER jsons by asserted_at desc;
  singleNewFact = limit sorted 1;
  GENERATE flatten(singleNewFact);
}

upToDateFactsBySubject = GROUP newestFacts by subject;

entitiesPropValTuples = FOREACH upToDateFactsBySubject {
  x = FOREACH newestFacts GENERATE property, value;
  GENERATE x;
}

ILLUSTRATE entitiesPropValTuples;


--mapped = FOREACH entitiesPropValTuples GENERATE myfuncs.BagtoMap(x) AS y;
--jsoned = FOREACH entitiesPropValTuples GENERATE
com.hortonworks.pig.udf.ToJson(x) AS final:chararray;
--jsoned = FOREACH mapped GENERATE json.dump(y);

limited = LIMIT entitiesPropValTuples 100;
STORE limited INTO 'pigoutput32';

-- ILLUSTRATE jsoned;





*THANKS!*

*- Chris*

Reply via email to