Siva, Try the latest build of elasticsearch-hadoop, ver 2.1.0 Beta 2 http://www.elasticsearch.org/overview/hadoop/download/
The esRDD has been changed to sparks PairRDD https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions The RDD will now be key/value (tuples) that look like (String, Map[String, ANY]) so you could start to walk the json key/value hierarchy with something like: esRDD.flatMap { args => args._2.get("aggregations") } (the syntax above is not exact, since your specific query result may have a different first key/value pair as the first object ) Best, Jeff Steinmetz Director of Data Science Ekho, Inc. www.ekho.me @jeffsteinmetz On Wednesday, September 17, 2014 6:13:37 AM UTC-7, siva pradeep wrote: > > Hi, > > I have a query which filters the rows and then applies the aggregation. I > tried running the query in "Sense" it gave me the expected result. But when > I try to run the same query using elasticsearch-spark_2.10 I get the rows > filtered by the query but not the aggregation result. I am sure I am > missing some thing but unable to figure out that. > > Here is the query > > GET _search > { > "query" : { > "bool": { > "must": [ > { > "filtered": { > "query": { > "range": { > "@timestamp": { > "from": "2014-09-03T01:40:37.437Z", > "to": "2014-09-03T01:45:11.437Z" > } > } > } > } > } > ] > } > }, > > "size": 0, > > "fields": ["cid","entity"], > "aggs": { > "cid": { > "terms": { > "field": "cid", > "min_doc_count": 2, > "size": 100 > }, > > "aggs": { > "tn": { > "terms": { > "field": "entity" > } > } > } > } > } > } > > > Query Result: > > { > "took": 10005, > "timed_out": false, > "_shards": { > "total": 10, > "successful": 10, > "failed": 0 > }, > "hits": { > "total": 2430, > "max_score": 0, > "hits": [] > }, > "aggregations": { > "cid": { > "buckets": [ > { > "key": " 01abcecc9a20cd3d6ae6be3509d014ba@76.96.107.168 > <javascript:>", > "doc_count": 2, > "tn": { > "buckets": [ > { > "key": "15052563268", > "doc_count": 2 > } > ] > } > } > ] > } > } > } > > > Spark program : > > object PresenceFilter extends App { > > val query: String = "{\n\n \"query\" : {\n\n \"bool\": {\n\n > \"must\": [\n\n {\n\n \"filtered\": {\n\n > \"query\": {\n\n \"range\": {\n\n > \"@timestamp\": {\n\n \"from\": > \"2014-09-03T01:40:37.437Z\",\n\n \"to\": > \"2014-09-03T01:45:11.437Z\"\n\n }\n\n > }\n\n }\n\n }\n\n }\n\n ]\n\n }\n\n > },\n \n \"size\": 0,\n \n \"fields\": [\"cid\",\"entity\"],\n\n > \"aggs\": {\n\n \"cid\": {\n\n \"terms\": {\n\n \"field\": > \"cid\",\n\n \"min_doc_count\": 2,\n\n \"size\": 100\n\n > },\n \n \"aggs\": {\n\n \"tn\": {\n\n \"terms\": > {\n\n \"field\": \"entity\"\n\n }\n\n }\n\n > }\n\n }\n }\n\n}" > > val sparkConf = new SparkConf() > .setAppName("PresenceAnalysis") > .setMaster("local[4]") > .set("es.nodes", "prs-wch-10.sys.comcast.net") > .set("es.port", "9200") > .set("es.resource", s"presence-2014.09.03/presence") > .set("es.endpoint", "_search") > // .set("es.query", query) > val sc = new SparkContext(sparkConf) > > > sc.esRDD.count returns 2430 rows > > How do I get the aggregation part (the following part) of the result into > the program > > "aggregations": { > "cid": { > "buckets": [ > { > "key": " 01abcecc9a20cd3d6ae6be3509d014ba@76.96.107.168 > <javascript:>", > "doc_count": 2, > "tn": { > "buckets": [ > { > "key": "15052563268", > "doc_count": 2 > } > ] > } > } > > Please advise. > > Thanks, > Siva P > -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/01d05c62-3095-4d9e-9407-4357add26896%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.