*Tim Washington* | *Senior Software Engineer* *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
747 Front St, 4th Fl | San Francisco, CA 94111 *Our Mission: **T**o build a better financial world* ---------- Forwarded message ---------- From: Tim Washington <tim.washing...@fundingcircle.com> Date: 9 May 2016 at 15:40 Subject: Howto Use The Elasticsearch Producer To: dev-subscr...@samza.apache.org Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < charles.re...@fundingcircle.com> Hey guys, I'm trying to use the Elasticsearch Producer, as described in this message <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an exception, trying to write out the message. 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - Entering run loop. 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - SystemStreamPartition [kafka, ledger, 0] is catched up. 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 org.apache.samza.container.TaskInstance$$anon$1@20904646] 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message :qwerty, :offset 5, :size 0, :system-stream-partition {:class org.apache.samza.system.SystemStreamPartition, :partition {:class org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream ledger, :system kafka}}} 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught exception in process loop. *org.apache.samza.SamzaException: Attempting to produce to unknown system: elasticsearch. Available systems: Set(kafka). Please add the system to your configuration, or update outgoing message envelope to send to a defined system.* at org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) at org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) ... at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) But the source of <https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86> the stack trace line, tells me that the code can't find a *producer* in the elasticsearch system. The code that I'm using is straight out of the example. collector.send(new OutgoingMessageEnvelope(new SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + ELASTICSEARCH_TYPE), parsedJsonObject)); And if we print out the config, I'm indeed seeing *elasticsearch* as a system. 14:09:19.978 [main] INFO ledger.run - ;; ;; ___ __ _ _ __ ___ ______ _ ;; / __|/ _` | '_ ` _ \|_ / _` | ;; \__ \ (_| | | | | | |/ / (_| | ;; |___/\__,_|_| |_| |_/___\__,_| ;; 14:09:19.980 [main] INFO ledger.run - ;; >>>>>>> ledger-elastic-connector <<<<<<< 14:09:19.980 [main] INFO ledger.run - ;; 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.replication.factor=1 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.system=kafka 14:09:19.981 [main] INFO ledger.run - ;; job.factory.class=org.apache.samza.job.local.ThreadJobFactory 14:09:19.981 [main] INFO ledger.run - ;; job.name=ledger-elastic-connector 14:09:19.981 [main] INFO ledger.run - ;; samja.task.factory.function=ledger.elastic.job/elastic-connector 14:09:19.981 [main] INFO ledger.run - ;; serializers.registry.edn.class=samja.core.EDNSerdeFactory 14:09:19.981 [main] INFO ledger.run - ;; systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.transport.host=localhost 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.transport.port=9300 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.producer.bootstrap.servers=192.168.99.100:9092 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.key.serde=edn 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.msg.serde=edn 14:09:19.982 [main] INFO ledger.run - ;; task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory 14:09:19.982 [main] INFO ledger.run - ;; task.checkpoint.replication.factor=1 14:09:19.982 [main] INFO ledger.run - ;; task.checkpoint.system=kafka 14:09:19.982 [main] INFO ledger.run - ;; task.class=samja.task.SimpleTask 14:09:19.983 [main] INFO ledger.run - ;; task.inputs=kafka.ledger,kafka.cash-movement-performed,kafka.ledger-entry-added Is there any other setup, or switches I need to make, to be able to successfully write a message out to elasticsearch? Thanks *Tim Washington* | *Senior Software Engineer* *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* 747 Front St, 4th Fl | San Francisco, CA 94111 *Our Mission: **T**o build a better financial world*