Glad to hear it! I'll add a JIRA task to always log the exception. We shouldn't need to enable debug logging to fix issues like this.
On Tue, May 10, 2016 at 10:16 AM, Tim Washington < tim.washing...@fundingcircle.com> wrote: > Hey Jacob, right you are. > > I added a "*<logger name="org.apache.samza.container" level="DEBUG"/>*" > logback configuration. And lo and behold, this exception detail popped up. > > 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception > detail: > java.lang.RuntimeException: Could not instantiate class > *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory* > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79) > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54) > at > > org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195) > ... > at clojure.lang.Var.invoke(Var.java:383) > at clojure.lang.AFn.applyToHelper(AFn.java:156) > at clojure.lang.Var.applyTo(Var.java:700) > at clojure.main.main(main.java:37) > > > > I fixed the typo (in bold red), and that solved the problem :) > > Cheers > > Tim > > > On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote: > > > Hey Tim, > > > > Can you include the full log? Since the producer is not showing up, I'd > > expect to see some earlier messages explaining why. In particular, this > > section of code from SamzaContainer.scala should print errors > instantiating > > the producers: > > > > val producers = systemFactories > > > .map { > > > case (systemName, systemFactory) => > > > try { > > > (systemName, systemFactory.getProducer(systemName, config, > > > samzaContainerMetrics.registry)) > > > } catch { > > > case e: Exception => > > > info("Failed to create a producer for %s, so skipping." > > > format systemName) > > > debug("Exception detail:", e) > > > (systemName, null) > > > } > > > } > > > .filter(_._2 != null) > > > .toMap > > > info("Got system producers: %s" format producers.keys) > > > > > > And from the looks of it, it would be a good idea to enable debug logging > > at least for org.apache.samza.container, so we can see the exception > > detail. > > > > Thanks, > > Jake > > > > On Tue, May 10, 2016 at 8:36 AM, Tim Washington < > > tim.washing...@fundingcircle.com> wrote: > > > > > *Tim Washington* | *Senior Software Engineer* > > > > > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* > > > > > > 747 Front St, 4th Fl | San Francisco, CA 94111 > > > > > > *Our Mission: **T**o build a better financial world* > > > > > > > > > > > > > > > ---------- Forwarded message ---------- > > > From: Tim Washington <tim.washing...@fundingcircle.com> > > > Date: 9 May 2016 at 15:40 > > > Subject: Howto Use The Elasticsearch Producer > > > To: dev-subscr...@samza.apache.org > > > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese < > > > charles.re...@fundingcircle.com> > > > > > > > > > Hey guys, > > > > > > I'm trying to use the Elasticsearch Producer, as described in this > > message > > > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an > > > exception, > > > trying to write out the message. > > > > > > 15:15:59.586 [ThreadJob] INFO o.a.samza.container.SamzaContainer - > > > Entering run loop. > > > 15:16:13.998 [ThreadJob] INFO o.a.samza.container.TaskInstance - > > > SystemStreamPartition [kafka, ledger, 0] is catched up. > > > 15:16:14.006 [ThreadJob] INFO ledger.elastic.job - > > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646 > > > org.apache.samza.container.TaskInstance$$anon$1@20904646] > > > 15:16:14.011 [ThreadJob] INFO ledger.elastic.job - {:class > > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message > > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class > > > org.apache.samza.system.SystemStreamPartition, :partition {:class > > > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system > > > kafka, :system-stream {:class org.apache.samza.system.SystemStream, > > :stream > > > ledger, :system kafka}}} > > > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - > > Caught > > > exception in process loop. > > > *org.apache.samza.SamzaException: Attempting to produce to unknown > > system: > > > elasticsearch. Available systems: Set(kafka). Please add the system to > > your > > > configuration, or update outgoing message envelope to send to a defined > > > system.* > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86) > > > ... > > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) > > > at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > > > > > > > > But the source of > > > < > > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86 > > > > > > > the stack trace line, tells me that the code can't find a *producer* in > > the > > > elasticsearch system. The code that I'm using is straight out of the > > > example. > > > > > > collector.send(new OutgoingMessageEnvelope(new > > > SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + > > > ELASTICSEARCH_TYPE), parsedJsonObject)); > > > > > > > > > And if we print out the config, I'm indeed seeing *elasticsearch* as a > > > system. > > > > > > > > > 14:09:19.978 [main] INFO ledger.run - ;; > > > ;; ___ __ _ _ __ ___ ______ _ > > > ;; / __|/ _` | '_ ` _ \|_ / _` | > > > ;; \__ \ (_| | | | | | |/ / (_| | > > > ;; |___/\__,_|_| |_| |_/___\__,_| > > > ;; > > > > > > 14:09:19.980 [main] INFO ledger.run - ;; >>>>>>> > > ledger-elastic-connector > > > <<<<<<< > > > 14:09:19.980 [main] INFO ledger.run - ;; > > > 14:09:19.981 [main] INFO ledger.run - ;; > > > job.coordinator.replication.factor=1 > > > 14:09:19.981 [main] INFO ledger.run - ;; job.coordinator.system=kafka > > > 14:09:19.981 [main] INFO ledger.run - ;; > > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > > > 14:09:19.981 [main] INFO ledger.run - ;; job.name > > > =ledger-elastic-connector > > > 14:09:19.981 [main] INFO ledger.run - ;; > > > samja.task.factory.function=ledger.elastic.job/elastic-connector > > > 14:09:19.981 [main] INFO ledger.run - ;; > > > serializers.registry.edn.class=samja.core.EDNSerdeFactory > > > 14:09:19.981 [main] INFO ledger.run - ;; > > > systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > > > > > > > systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > systems.elasticsearch.client.transport.host=localhost > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > systems.elasticsearch.client.transport.port=9300 > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > > > > > > > systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > > > > > > > systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181 > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > systems.kafka.producer.bootstrap.servers=192.168.99.100:9092 > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > 14:09:19.982 [main] INFO ledger.run - ;; > > systems.kafka.samza.key.serde=edn > > > 14:09:19.982 [main] INFO ledger.run - ;; > > systems.kafka.samza.msg.serde=edn > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > > 14:09:19.982 [main] INFO ledger.run - ;; > > > task.checkpoint.replication.factor=1 > > > 14:09:19.982 [main] INFO ledger.run - ;; task.checkpoint.system=kafka > > > 14:09:19.982 [main] INFO ledger.run - ;; > > task.class=samja.task.SimpleTask > > > 14:09:19.983 [main] INFO ledger.run - ;; > > > > > > > > > task.inputs=kafka.ledger,kafka.cash-movement-performed,kafka.ledger-entry-added > > > > > > > > > Is there any other setup, or switches I need to make, to be able to > > > successfully write a message out to elasticsearch? > > > > > > > > > Thanks > > > > > > *Tim Washington* | *Senior Software Engineer* > > > > > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>* > > > > > > 747 Front St, 4th Fl | San Francisco, CA 94111 > > > > > > *Our Mission: **T**o build a better financial world* > > > > > >