Re: Howto Use The Elasticsearch Producer
Glad to hear it! I'll add a JIRA task to always log the exception. We shouldn't need to enable debug logging to fix issues like this. On Tue, May 10, 2016 at 10:16 AM, Tim Washington < tim.washing...@fundingcircle.com> wrote: > Hey Jacob, right you are. > > I added a "**" > logback configuration. And lo and behold, this exception detail popped up. > > 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception > detail: > java.lang.RuntimeException: Could not instantiate class > *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory* > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79) > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54) > at > > org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195) > ... > at clojure.lang.Var.invoke(Var.java:383) > at clojure.lang.AFn.applyToHelper(AFn.java:156) > at clojure.lang.Var.applyTo(Var.java:700) > at clojure.main.main(main.java:37) > > > > I fixed the typo (in bold red), and that solved the problem :) > > Cheers > > Tim > > > On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote: > > > Hey Tim, > > > > Can you include the full log? Since the producer is not showing up, I'd > > expect to see some earlier messages explaining why. In particular, this > > section of code from SamzaContainer.scala should print errors > instantiating > > the producers: > > > > val producers = systemFactories > > > .map { > > > case (systemName, systemFactory) => > > > try { > > > (systemName, systemFactory.getProducer(systemName, config, > > > samzaContainerMetrics.registry)) > > > } catch { > > > case e: Exception => > > > info("Failed to create a producer for %s, so skipping." > > > format systemName) > > > debug("Exception detail:", e) > > > (systemName, null) > > > } > > > } > > > .filter(_._2 != null) > > > .toMap > > > info("Got system producers: %s" format producers.keys) > > > > > > And from the looks of it, it would be a good idea to enable debug logging > > at least for org.apache.samza.container, so we can see the exception > > detail. > > > > Thanks, > > Jake > > > > On Tue, May 10, 2016 at 8:36 AM, Tim Washington < > > tim.washing...@fundingcircle.com> wrote: > > > > > *Tim Washington* | *Senior Software Engineer* > > > > > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* > > > > > > 747 Front St, 4th Fl | San Francisco, CA 94111 > > > > > > *Our Mission: **T**o build a better financial world* > > > > > > > > > > > > > > > -- Forwarded message -- > > > From: Tim Washington <tim.washing...@fundingcircle.com> > > > Date: 9 May 2016 at 15:40 > > > Subject: Howto Use The Elasticsearch Producer > > > To: dev-subscr...@samza.apache.org > > > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < > > > charles.re...@fundingcircle.com> > > > > > > > > > Hey guys, > > > > > > I'm trying to use the Elasticsearch Producer, as described in this > > message > > > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an > > > exception, > > > trying to write out the message. > > > > > > 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - > > > Entering run loop. > > > 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - > > > SystemStreamPartition [kafka, ledger, 0] is catched up. > > > 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - > > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 > > > org.apache.samza.container.TaskInstance$$anon$1@20904646] > > > 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class > > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message > > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class > > > org.apache.samza.system.SystemStreamPartition, :partition {:class > > > org.apache.samza.Partition, :partition-id 0}, :stream ledge
Re: Howto Use The Elasticsearch Producer
Hey Jacob, right you are. I added a "**" logback configuration. And lo and behold, this exception detail popped up. 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception detail: java.lang.RuntimeException: Could not instantiate class *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory* at org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79) at org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54) at org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195) ... at clojure.lang.Var.invoke(Var.java:383) at clojure.lang.AFn.applyToHelper(AFn.java:156) at clojure.lang.Var.applyTo(Var.java:700) at clojure.main.main(main.java:37) I fixed the typo (in bold red), and that solved the problem :) Cheers Tim On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote: > Hey Tim, > > Can you include the full log? Since the producer is not showing up, I'd > expect to see some earlier messages explaining why. In particular, this > section of code from SamzaContainer.scala should print errors instantiating > the producers: > > val producers = systemFactories > > .map { > > case (systemName, systemFactory) => > > try { > > (systemName, systemFactory.getProducer(systemName, config, > > samzaContainerMetrics.registry)) > > } catch { > > case e: Exception => > > info("Failed to create a producer for %s, so skipping." > > format systemName) > > debug("Exception detail:", e) > > (systemName, null) > > } > > } > > .filter(_._2 != null) > > .toMap > > info("Got system producers: %s" format producers.keys) > > > And from the looks of it, it would be a good idea to enable debug logging > at least for org.apache.samza.container, so we can see the exception > detail. > > Thanks, > Jake > > On Tue, May 10, 2016 at 8:36 AM, Tim Washington < > tim.washing...@fundingcircle.com> wrote: > > > *Tim Washington* | *Senior Software Engineer* > > > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* > > > > 747 Front St, 4th Fl | San Francisco, CA 94111 > > > > *Our Mission: **T**o build a better financial world* > > > > > > > > > > -- Forwarded message -- > > From: Tim Washington <tim.washing...@fundingcircle.com> > > Date: 9 May 2016 at 15:40 > > Subject: Howto Use The Elasticsearch Producer > > To: dev-subscr...@samza.apache.org > > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < > > charles.re...@fundingcircle.com> > > > > > > Hey guys, > > > > I'm trying to use the Elasticsearch Producer, as described in this > message > > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an > > exception, > > trying to write out the message. > > > > 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - > > Entering run loop. > > 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - > > SystemStreamPartition [kafka, ledger, 0] is catched up. > > 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 > > org.apache.samza.container.TaskInstance$$anon$1@20904646] > > 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class > > org.apache.samza.system.SystemStreamPartition, :partition {:class > > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system > > kafka, :system-stream {:class org.apache.samza.system.SystemStream, > :stream > > ledger, :system kafka}}} > > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - > Caught > > exception in process loop. > > *org.apache.samza.SamzaException: Attempting to produce to unknown > system: > > elasticsearch. Available systems: Set(kafka). Please add the system to > your > > configuration, or update outgoing message envelope to send to a defined > > system.* > > at > > > > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > > at > > > > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > > ... > > at
Re: Howto Use The Elasticsearch Producer
Hey Tim, Can you include the full log? Since the producer is not showing up, I'd expect to see some earlier messages explaining why. In particular, this section of code from SamzaContainer.scala should print errors instantiating the producers: val producers = systemFactories > .map { > case (systemName, systemFactory) => > try { > (systemName, systemFactory.getProducer(systemName, config, > samzaContainerMetrics.registry)) > } catch { > case e: Exception => > info("Failed to create a producer for %s, so skipping." > format systemName) > debug("Exception detail:", e) > (systemName, null) > } > } > .filter(_._2 != null) > .toMap > info("Got system producers: %s" format producers.keys) And from the looks of it, it would be a good idea to enable debug logging at least for org.apache.samza.container, so we can see the exception detail. Thanks, Jake On Tue, May 10, 2016 at 8:36 AM, Tim Washington < tim.washing...@fundingcircle.com> wrote: > *Tim Washington* | *Senior Software Engineer* > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* > > 747 Front St, 4th Fl | San Francisco, CA 94111 > > *Our Mission: **T**o build a better financial world* > > > > > -- Forwarded message ------ > From: Tim Washington <tim.washing...@fundingcircle.com> > Date: 9 May 2016 at 15:40 > Subject: Howto Use The Elasticsearch Producer > To: dev-subscr...@samza.apache.org > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < > charles.re...@fundingcircle.com> > > > Hey guys, > > I'm trying to use the Elasticsearch Producer, as described in this message > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an > exception, > trying to write out the message. > > 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - > Entering run loop. > 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - > SystemStreamPartition [kafka, ledger, 0] is catched up. > 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 > org.apache.samza.container.TaskInstance$$anon$1@20904646] > 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message > :qwerty, :offset 5, :size 0, :system-stream-partition {:class > org.apache.samza.system.SystemStreamPartition, :partition {:class > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system > kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream > ledger, :system kafka}}} > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught > exception in process loop. > *org.apache.samza.SamzaException: Attempting to produce to unknown system: > elasticsearch. Available systems: Set(kafka). Please add the system to your > configuration, or update outgoing message envelope to send to a defined > system.* > at > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > at > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > ... > at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) > at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > > But the source of > < > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86 > > > the stack trace line, tells me that the code can't find a *producer* in the > elasticsearch system. The code that I'm using is straight out of the > example. > > collector.send(new OutgoingMessageEnvelope(new > SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + > ELASTICSEARCH_TYPE), parsedJsonObject)); > > > And if we print out the config, I'm indeed seeing *elasticsearch* as a > system. > > > 14:09:19.978 [main] INFO ledger.run - ;; > ;; ___ __ _ _ __ ___ __ _ > ;; / __|/ _` | '_ ` _ \|_ / _` | > ;; \__ \ (_| | | | | | |/ / (_| | > ;; |___/\__,_|_| |_| |_/___\__,_| > ;; > > 14:09:19.980 [main] INFO ledger.run - ;; >>>>>>> ledger-elastic-connector > <<<<<<< > 14:09:19.980 [main] INFO ledger.run - ;; > 14:09:19.981 [main] INFO ledger.run - ;; > job.coordinator.replication.factor=1 > 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.system=kafka > 14:09:19.981 [main] INFO ledger.run - ;; > job.factory.class=org.apache.samza.job.local.ThreadJob
Fwd: Howto Use The Elasticsearch Producer
*Tim Washington* | *Senior Software Engineer* *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* 747 Front St, 4th Fl | San Francisco, CA 94111 *Our Mission: **T**o build a better financial world* -- Forwarded message -- From: Tim Washington <tim.washing...@fundingcircle.com> Date: 9 May 2016 at 15:40 Subject: Howto Use The Elasticsearch Producer To: dev-subscr...@samza.apache.org Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < charles.re...@fundingcircle.com> Hey guys, I'm trying to use the Elasticsearch Producer, as described in this message <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an exception, trying to write out the message. 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - Entering run loop. 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - SystemStreamPartition [kafka, ledger, 0] is catched up. 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 org.apache.samza.container.TaskInstance$$anon$1@20904646] 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message :qwerty, :offset 5, :size 0, :system-stream-partition {:class org.apache.samza.system.SystemStreamPartition, :partition {:class org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream ledger, :system kafka}}} 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught exception in process loop. *org.apache.samza.SamzaException: Attempting to produce to unknown system: elasticsearch. Available systems: Set(kafka). Please add the system to your configuration, or update outgoing message envelope to send to a defined system.* at org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) at org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) ... at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) But the source of <https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86> the stack trace line, tells me that the code can't find a *producer* in the elasticsearch system. The code that I'm using is straight out of the example. collector.send(new OutgoingMessageEnvelope(new SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + ELASTICSEARCH_TYPE), parsedJsonObject)); And if we print out the config, I'm indeed seeing *elasticsearch* as a system. 14:09:19.978 [main] INFO ledger.run - ;; ;; ___ __ _ _ __ ___ __ _ ;; / __|/ _` | '_ ` _ \|_ / _` | ;; \__ \ (_| | | | | | |/ / (_| | ;; |___/\__,_|_| |_| |_/___\__,_| ;; 14:09:19.980 [main] INFO ledger.run - ;; >>>>>>> ledger-elastic-connector <<<<<<< 14:09:19.980 [main] INFO ledger.run - ;; 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.replication.factor=1 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.system=kafka 14:09:19.981 [main] INFO ledger.run - ;; job.factory.class=org.apache.samza.job.local.ThreadJobFactory 14:09:19.981 [main] INFO ledger.run - ;; job.name=ledger-elastic-connector 14:09:19.981 [main] INFO ledger.run - ;; samja.task.factory.function=ledger.elastic.job/elastic-connector 14:09:19.981 [main] INFO ledger.run - ;; serializers.registry.edn.class=samja.core.EDNSerdeFactory 14:09:19.981 [main] INFO ledger.run - ;; systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.transport.host=localhost 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.client.transport.port=9300 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.producer.bootstrap.servers=192.168.99.100:9092 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.key.serde=edn 14:09:19.982 [main] INFO ledger.run - ;; systems.kafka.samza.msg.serde=edn 14:09:19.982 [main] INFO ledger.run - ;; task.checkpoint.factory=or