ElasticSearch Acknowledgements
Previously, I was using storm topology to build my pipeline to suffice my use case which was - Reading from Kafka topic through Kafka spout, process these tuples in Bolt, then emit them to ElasticSearch bolt to index my tuples. In storm topology, I was able to acknowledge each tuple i was indexing and these acknowledgments were sent from EsBolt to spout and finally to Zookeeper. Later I realised that Storm is not the appropriate component for my use case and I switched to Spark. I am doing the same thing with Spark - Reading from Kafka topic, reading messages in Spark stream, pass this stream to a component which converts this stream to RDD and finally indexes these RDDs. But now, there is no mechanism for acknowledgements. But I still want to send acknowledgements for each messages being indexed to the previous component. My code looks like this - JavaEsSpark.saveJsonToEs(jrd, "index_{date}/type"); How can I get acknowledgements in Spark-ElasticSearch framework? -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/f73a57fd-1f5a-41f3-b6ec-077961668687%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example
Hi Julien Yes. Probably that's the only possible work around to do this. What I am planning to do is, calculate the name of index prior to writing and add a field named "indexname" in my JSON and then I will use JavaEsSpark.saveJsonToEs(jrd, "index_{indexname}/type"); Thanks for the reply. Abhishek -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/7d41dbce-964c-4c71-94cf-d4dcf0e902b5%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example
Hi Julien, I am trying to achieve something similar. In my case, my JSON contains a field "time" in Unix time. And i want to partition my indexes by this field. That is, if one JSON1 contains 1422904680 in "time" and JSON2 contains 1422991080 in time, then i want to create indexes which are partitioned by time (24 hours) like index_1422835200_1422921599 - which will contain JSON1 because value of "time" = 1422904680 falls in its range. index_1422921600_1423007999 - which will contain JSON2 because value of "time" = 1422991080 falls in its range. Now I want to create my indexes dynamically. There is also possibility of receiving a JSON which contain a value of time before current date. To achieve this, I need to create index name dynamically by calculating it at the time of creation then and there itself. Programmatically, I want to achieve something like this JavaEsSpark.saveJsonToEs(jrd, "index_{time} - {time} % 86400}_{ {time} + 86400 - {date} % 86400 }/type"); Is it possible to achieve dynamic index name creation as described above? If no, is there any work around to achieve my use case. Thanks Abhishek -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2326fa57-63d9-4b9c-8b71-b1c8b2b0e061%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Unable to write in ElasticSearch using Spark in java (throws java.lang.IncompatibleClassChangeError: Implementing class exception)
I am using a simple Java program to index a spark JavaRDD into Elasticsearch. My code looks like this - SparkConf conf = new SparkConf().setAppName("IndexDemo").setMaster("spark://ct-0094:7077"); conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName()); conf.set("es.index.auto.create", "true"); conf.set("es.nodes", "192.168.50.103"); conf.set("es.port", "9200"); JavaSparkContext sc = new JavaSparkContext(conf); sc.addJar("./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar"); String arrayval = "string"; List data = Arrays.asList( new Data(1l, 10l, arrayval+"1"), new Data(2l, 20l, arrayval+"2"), new Data(3l, 30l, arrayval+"3"), new Data(4l, 40l, arrayval+"4"), new Data(5l, 50l, arrayval+"5"), new Data(6l, 60l, arrayval+"6"), new Data(7l, 70l, arrayval+"7"), new Data(8l, 80l, arrayval+"8"), new Data(9l, 90l, arrayval+"9"), new Data(10l, 100l, arrayval+"10") ); JavaRDD javaRDD = sc.parallelize(data); saveToEs(javaRDD, "index/type"); Running above codes gives an exception (Stack Trace)- >15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root 15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root 15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/16 13:20:41 INFO Remoting: Starting remoting 15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ct-0015:55586] 15/01/16 13:20:41 INFO util.Utils: Successfully started service 'sparkDriver' on port 55586. 15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker 15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster 15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150116132041-f924 15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB 15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a65b108f-e131-480a-85b2-ed65650cf991 15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server 15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031 15/01/16 13:20:42 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:34049 15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file server' on port 34049. 15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031 15/01/16 13:20:42 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040 15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master spark://ct-0094:7077... 15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150116131933-0078 15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on 34762 15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block manager ct-0015:34762 with 2.3 GB RAM, BlockManagerId(, ct-0015, 34762) 15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager 15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/01/16 13:20:43 INFO spark.SparkContext: Added JAR ./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://192.168.50.103:34049/jars/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1421394643161 Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:367) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader
EsBolt - Schema Definition
I am using EsBolt which receives stream from Kafka bolt. The Kafka bolt emits json which is given to EsBolt. Now, my JSON string may contain geo data. So I want it to be indexed geo-spatially. How I can define my indexing schema for EsBolt. Thanks -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/a9bca2db-108a-4f65-a4f7-32c0276b2c4b%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Storm Integration - EsBolt configuration to set Tokenizer
Hi I am using EsBolt to index a map with key and value in string. When I index the map and then further run the query feom ElasticSearch web UI, I got to know that I am not indexing my fields properly as I am not able to perform case-sensitive search. For eg. my map contained a value 'John'. When I search for 'john', the result is returned. But when I search for 'John', no results are found. I guess I am not configuring the bolt properly. Please guide me, how to configure a tokenizer in EsBolt. I want to set tokenizer from my java program and not by curl. Thanks Abhishek Patel -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/0ebad957-443d-4e08-a0d0-5be51df754a9%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.