ElasticSearch Acknowledgements

2015-02-02 Thread Abhishek Patel
Previously, I was using storm topology to build my pipeline to suffice my 
use case which was - Reading from Kafka topic through Kafka spout, process 
these tuples in Bolt, then emit them to ElasticSearch bolt to index my 
tuples. In storm topology, I was able to acknowledge each tuple i was 
indexing and these  acknowledgments were sent from EsBolt to spout and 
finally to Zookeeper.

Later I realised that Storm is not the appropriate component for my use 
case and I switched to Spark. I am doing the same thing with Spark - 
Reading from Kafka topic, reading messages in Spark stream, pass this 
stream to a component which converts this stream to RDD and finally indexes 
these RDDs. But now, there is no mechanism for acknowledgements. But I 
still want to send acknowledgements for each messages being indexed to the 
previous component.

My code looks like this - 

JavaEsSpark.saveJsonToEs(jrd, "index_{date}/type");

How can I get acknowledgements in Spark-ElasticSearch framework? 

You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
For more options, visit https://groups.google.com/d/optout.

Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example

2015-02-02 Thread Abhishek Patel
Hi Julien

Yes. Probably that's the only possible work around to do this. What I am 
planning to do is, calculate the name of index prior to writing and add a 
field named "indexname" in my JSON and then I will use 

  JavaEsSpark.saveJsonToEs(jrd, "index_{indexname}/type");

 Thanks for the reply.


You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
For more options, visit https://groups.google.com/d/optout.

Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example

2015-02-02 Thread Abhishek Patel
Hi Julien,

I am trying to achieve something similar. In my case, my JSON contains a 
field "time" in Unix time. And i want to partition my indexes by this 
field. That is, if one JSON1 contains 1422904680 in "time" and JSON2 
contains 1422991080 in time, then i want to create indexes which are 
partitioned by time (24 hours) like

index_1422835200_1422921599 - which will contain JSON1 because value of 
"time" = 1422904680 falls in its range.
index_1422921600_1423007999 - which will contain JSON2 because value of 
"time" = 1422991080 falls in its range.

Now I want to create my indexes dynamically. There is also possibility of 
receiving a JSON which contain a value of time before current date.

To achieve this, I need to create index name dynamically by calculating it 
at the time of creation then and there itself. Programmatically, I want to 
achieve something like this

 JavaEsSpark.saveJsonToEs(jrd, "index_{time} - {time} % 86400}_{ {time} + 
86400 - {date} % 86400 }/type");

Is it possible to achieve dynamic index name creation as described above? 
If no, is there any work around to achieve my use case.


You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
For more options, visit https://groups.google.com/d/optout.

Unable to write in ElasticSearch using Spark in java (throws java.lang.IncompatibleClassChangeError: Implementing class exception)

2015-01-16 Thread Abhishek Patel
I am using a simple Java program to index a spark JavaRDD into 
Elasticsearch. My code looks like this - 

SparkConf conf = new 
conf.set("es.index.auto.create", "true"); 
conf.set("es.nodes", "");
conf.set("es.port", "9200");
JavaSparkContext sc = new JavaSparkContext(conf);


String arrayval = "string";
List data = Arrays.asList(
new Data(1l, 10l, arrayval+"1"),
new Data(2l, 20l, arrayval+"2"),
new Data(3l, 30l, arrayval+"3"),
new Data(4l, 40l, arrayval+"4"),
new Data(5l, 50l, arrayval+"5"),
new Data(6l, 60l, arrayval+"6"),
new Data(7l, 70l, arrayval+"7"),
new Data(8l, 80l, arrayval+"8"),
new Data(9l, 90l, arrayval+"9"),
new Data(10l, 100l, arrayval+"10")

JavaRDD javaRDD = sc.parallelize(data);
saveToEs(javaRDD, "index/type");

Running above codes gives an exception (Stack Trace)- 

>15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager: 
authentication disabled; ui acls disabled; users with view permissions: 
Set(root); users with modify permissions: Set(root)
15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/16 13:20:41 INFO Remoting: Starting remoting
15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses 
15/01/16 13:20:41 INFO util.Utils: Successfully started service 
'sparkDriver' on port 55586.
15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at 
15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with 
capacity 2.3 GB
15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is 
15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started 
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file 
server' on port 34049.
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started 
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI' 
on port 4040.
15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040
15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master 
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to 
Spark cluster with app ID app-20150116131933-0078
15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on 
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register 
15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block 
manager ct-0015:34762 with 2.3 GB RAM, BlockManagerId(, ct-0015, 
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: 
SchedulerBackend is ready for scheduling beginning after reached 
minRegisteredResourcesRatio: 0.0
15/01/16 13:20:43 INFO spark.SparkContext: Added JAR 
./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at
with timestamp 1421394643161
Exception in thread "main" java.lang.IncompatibleClassChangeError: 
Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader

EsBolt - Schema Definition

2014-12-29 Thread Abhishek Patel
I am using EsBolt which receives stream from Kafka bolt. The Kafka bolt 
emits json which is given to EsBolt. Now, my JSON string may contain geo 
data. So I want it to be indexed geo-spatially. How I can define my 
indexing schema for EsBolt.


You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
For more options, visit https://groups.google.com/d/optout.

Storm Integration - EsBolt configuration to set Tokenizer

2014-12-25 Thread Abhishek Patel

I am using EsBolt to index a map with key and value in string. When I index 
the map and then further run the query feom ElasticSearch web UI, I got to 
know that I am not indexing my fields properly as I am not able to perform 
case-sensitive search. 
For eg. my map contained a value 'John'. When I search for 'john', the 
result is returned. But when I search for 'John', no results are found. I 
guess I am not configuring the bolt properly. Please guide me, how to 
configure a tokenizer in EsBolt. I want to set tokenizer from my java 
program and not by curl.

Abhishek Patel  

You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
For more options, visit https://groups.google.com/d/optout.