ElasticSearch Acknowledgements

2015-02-02 Thread Abhishek Patel
Previously, I was using storm topology to build my pipeline to suffice my 
use case which was - Reading from Kafka topic through Kafka spout, process 
these tuples in Bolt, then emit them to ElasticSearch bolt to index my 
tuples. In storm topology, I was able to acknowledge each tuple i was 
indexing and these  acknowledgments were sent from EsBolt to spout and 
finally to Zookeeper.

Later I realised that Storm is not the appropriate component for my use 
case and I switched to Spark. I am doing the same thing with Spark - 
Reading from Kafka topic, reading messages in Spark stream, pass this 
stream to a component which converts this stream to RDD and finally indexes 
these RDDs. But now, there is no mechanism for acknowledgements. But I 
still want to send acknowledgements for each messages being indexed to the 
previous component.

My code looks like this - 

JavaEsSpark.saveJsonToEs(jrd, "index_{date}/type");

How can I get acknowledgements in Spark-ElasticSearch framework? 

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/f73a57fd-1f5a-41f3-b6ec-077961668687%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example

2015-02-02 Thread Abhishek Patel
Hi Julien

Yes. Probably that's the only possible work around to do this. What I am 
planning to do is, calculate the name of index prior to writing and add a 
field named "indexname" in my JSON and then I will use 

  JavaEsSpark.saveJsonToEs(jrd, "index_{indexname}/type");

 Thanks for the reply.



Abhishek

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/7d41dbce-964c-4c71-94cf-d4dcf0e902b5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example

2015-02-02 Thread Abhishek Patel
Hi Julien,

I am trying to achieve something similar. In my case, my JSON contains a 
field "time" in Unix time. And i want to partition my indexes by this 
field. That is, if one JSON1 contains 1422904680 in "time" and JSON2 
contains 1422991080 in time, then i want to create indexes which are 
partitioned by time (24 hours) like

index_1422835200_1422921599 - which will contain JSON1 because value of 
"time" = 1422904680 falls in its range.
index_1422921600_1423007999 - which will contain JSON2 because value of 
"time" = 1422991080 falls in its range.

Now I want to create my indexes dynamically. There is also possibility of 
receiving a JSON which contain a value of time before current date.

To achieve this, I need to create index name dynamically by calculating it 
at the time of creation then and there itself. Programmatically, I want to 
achieve something like this

 JavaEsSpark.saveJsonToEs(jrd, "index_{time} - {time} % 86400}_{ {time} + 
86400 - {date} % 86400 }/type");

Is it possible to achieve dynamic index name creation as described above? 
If no, is there any work around to achieve my use case.

Thanks
Abhishek

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/2326fa57-63d9-4b9c-8b71-b1c8b2b0e061%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Unable to write in ElasticSearch using Spark in java (throws java.lang.IncompatibleClassChangeError: Implementing class exception)

2015-01-16 Thread Abhishek Patel
I am using a simple Java program to index a spark JavaRDD into 
Elasticsearch. My code looks like this - 

SparkConf conf = new 
SparkConf().setAppName("IndexDemo").setMaster("spark://ct-0094:7077");
 
conf.set("spark.serializer", 
org.apache.spark.serializer.KryoSerializer.class.getName());
conf.set("es.index.auto.create", "true"); 
conf.set("es.nodes", "192.168.50.103");
conf.set("es.port", "9200");
JavaSparkContext sc = new JavaSparkContext(conf);

sc.addJar("./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

String arrayval = "string";
List data = Arrays.asList(
new Data(1l, 10l, arrayval+"1"),
new Data(2l, 20l, arrayval+"2"),
new Data(3l, 30l, arrayval+"3"),
new Data(4l, 40l, arrayval+"4"),
new Data(5l, 50l, arrayval+"5"),
new Data(6l, 60l, arrayval+"6"),
new Data(7l, 70l, arrayval+"7"),
new Data(8l, 80l, arrayval+"8"),
new Data(9l, 90l, arrayval+"9"),
new Data(10l, 100l, arrayval+"10")
);

JavaRDD javaRDD = sc.parallelize(data);
saveToEs(javaRDD, "index/type");

Running above codes gives an exception (Stack Trace)- 

>15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager: 
authentication disabled; ui acls disabled; users with view permissions: 
Set(root); users with modify permissions: Set(root)
15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/16 13:20:41 INFO Remoting: Starting remoting
15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@ct-0015:55586]
15/01/16 13:20:41 INFO util.Utils: Successfully started service 
'sparkDriver' on port 55586.
15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at 
/tmp/spark-local-20150116132041-f924
15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with 
capacity 2.3 GB
15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is 
/tmp/spark-a65b108f-e131-480a-85b2-ed65650cf991
15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:34049
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file 
server' on port 34049.
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI' 
on port 4040.
15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040
15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master 
spark://ct-0094:7077...
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to 
Spark cluster with app ID app-20150116131933-0078
15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on 
34762
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block 
manager ct-0015:34762 with 2.3 GB RAM, BlockManagerId(, ct-0015, 
34762)
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: 
SchedulerBackend is ready for scheduling beginning after reached 
minRegisteredResourcesRatio: 0.0
15/01/16 13:20:43 INFO spark.SparkContext: Added JAR 
./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at 
http://192.168.50.103:34049/jars/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 
with timestamp 1421394643161
Exception in thread "main" java.lang.IncompatibleClassChangeError: 
Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader

EsBolt - Schema Definition

2014-12-29 Thread Abhishek Patel
I am using EsBolt which receives stream from Kafka bolt. The Kafka bolt 
emits json which is given to EsBolt. Now, my JSON string may contain geo 
data. So I want it to be indexed geo-spatially. How I can define my 
indexing schema for EsBolt.

Thanks 

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/a9bca2db-108a-4f65-a4f7-32c0276b2c4b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Storm Integration - EsBolt configuration to set Tokenizer

2014-12-25 Thread Abhishek Patel
Hi

I am using EsBolt to index a map with key and value in string. When I index 
the map and then further run the query feom ElasticSearch web UI, I got to 
know that I am not indexing my fields properly as I am not able to perform 
case-sensitive search. 
For eg. my map contained a value 'John'. When I search for 'john', the 
result is returned. But when I search for 'John', no results are found. I 
guess I am not configuring the bolt properly. Please guide me, how to 
configure a tokenizer in EsBolt. I want to set tokenizer from my java 
program and not by curl.

Thanks
Abhishek Patel  

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/0ebad957-443d-4e08-a0d0-5be51df754a9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.