Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
Hey Jacob, right you are.

I added a "**"
logback configuration. And lo and behold, this exception detail popped up.

10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
detail:
java.lang.RuntimeException: Could not instantiate class
*org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
at
org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
...
at clojure.lang.Var.invoke(Var.java:383)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.Var.applyTo(Var.java:700)
at clojure.main.main(main.java:37)



I fixed the typo (in bold red), and that solved the problem :)

Cheers

Tim


On 10 May 2016 at 09:08, Jacob Maes  wrote:

> Hey Tim,
>
> Can you include the full log? Since the producer is not showing up, I'd
> expect to see some earlier messages explaining why. In particular, this
> section of code from SamzaContainer.scala should print errors instantiating
> the producers:
>
> val producers = systemFactories
> >   .map {
> > case (systemName, systemFactory) =>
> >   try {
> > (systemName, systemFactory.getProducer(systemName, config,
> > samzaContainerMetrics.registry))
> >   } catch {
> > case e: Exception =>
> >   info("Failed to create a producer for %s, so skipping."
> > format systemName)
> >   debug("Exception detail:", e)
> >   (systemName, null)
> >   }
> >   }
> >   .filter(_._2 != null)
> >   .toMap
> > info("Got system producers: %s" format producers.keys)
>
>
> And from the looks of it, it would be a good idea to enable debug logging
> at least for org.apache.samza.container, so we can see the exception
> detail.
>
> Thanks,
> Jake
>
> On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> tim.washing...@fundingcircle.com> wrote:
>
> > *Tim Washington* | *Senior Software Engineer*
> >
> > *tim.washing...@fundingcircle.com *
> >
> > 747 Front St, 4th Fl | San Francisco, CA 94111
> >
> > *Our Mission: **T**o build a better financial world*
> >
> >
> >
> >
> > -- Forwarded message --
> > From: Tim Washington 
> > Date: 9 May 2016 at 15:40
> > Subject: Howto Use The Elasticsearch Producer
> > To: dev-subscr...@samza.apache.org
> > Cc: Andy Chambers , Charles Reese <
> > charles.re...@fundingcircle.com>
> >
> >
> > Hey guys,
> >
> > I'm trying to use the Elasticsearch Producer, as described in this
> message
> > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> > exception,
> > trying to write out the message.
> >
> > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > Entering run loop.
> > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> > kafka, :system-stream {:class org.apache.samza.system.SystemStream,
> :stream
> > ledger, :system kafka}}}
> > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer -
> Caught
> > exception in process loop.
> > *org.apache.samza.SamzaException: Attempting to produce to unknown
> system:
> > elasticsearch. Available systems: Set(kafka). Please add the system to
> your
> > configuration, or update outgoing message envelope to send to a defined
> > system.*
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > ...
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> > at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJo

Fwd: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
*Tim Washington* | *Senior Software Engineer*

*tim.washing...@fundingcircle.com *

747 Front St, 4th Fl | San Francisco, CA 94111

*Our Mission: **T**o build a better financial world*




-- Forwarded message --
From: Tim Washington 
Date: 9 May 2016 at 15:40
Subject: Howto Use The Elasticsearch Producer
To: dev-subscr...@samza.apache.org
Cc: Andy Chambers , Charles Reese <
charles.re...@fundingcircle.com>


Hey guys,

I'm trying to use the Elasticsearch Producer, as described in this message
<https://reviews.apache.org/r/36768/diff/1#3>. But I run into an exception,
trying to write out the message.

15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
Entering run loop.
15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
SystemStreamPartition [kafka, ledger, 0] is catched up.
15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
#object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
org.apache.samza.container.TaskInstance$$anon$1@20904646]
15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
:qwerty, :offset 5, :size 0, :system-stream-partition {:class
org.apache.samza.system.SystemStreamPartition, :partition {:class
org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
ledger, :system kafka}}}
15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
exception in process loop.
*org.apache.samza.SamzaException: Attempting to produce to unknown system:
elasticsearch. Available systems: Set(kafka). Please add the system to your
configuration, or update outgoing message envelope to send to a defined
system.*
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
...
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)


But the source of
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86>
the stack trace line, tells me that the code can't find a *producer* in the
elasticsearch system. The code that I'm using is straight out of the
example.

collector.send(new OutgoingMessageEnvelope(new
SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
ELASTICSEARCH_TYPE), parsedJsonObject));


And if we print out the config, I'm indeed seeing *elasticsearch* as a
system.


14:09:19.978 [main] INFO  ledger.run - ;;
;;  ___  __ _ _ __ ___  __ _
;; / __|/ _` | '_ ` _ \|_  / _` |
;; \__ \ (_| | | | | | |/ / (_| |
;; |___/\__,_|_| |_| |_/___\__,_|
;;

14:09:19.980 [main] INFO  ledger.run - ;; >>>>>>> ledger-elastic-connector
<<<<<<<
14:09:19.980 [main] INFO  ledger.run - ;;
14:09:19.981 [main] INFO  ledger.run - ;;
job.coordinator.replication.factor=1
14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
14:09:19.981 [main] INFO  ledger.run - ;;
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
14:09:19.981 [main] INFO  ledger.run - ;; job.name=ledger-elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
samja.task.factory.function=ledger.elastic.job/elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
serializers.registry.edn.class=samja.core.EDNSerdeFactory
14:09:19.981 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.host=localhost
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.port=9300
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.producer.bootstrap.servers=192.168.99.100:9092
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.key.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.msg.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;;
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
14:09:19.982 [main] INFO  ledger.run - ;;
task.checkpoint.replication