Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Jacob Maes
Glad to hear it!

I'll add a JIRA task to always log the exception. We shouldn't need to
enable debug logging to fix issues like this.

On Tue, May 10, 2016 at 10:16 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> Hey Jacob, right you are.
>
> I added a "**"
> logback configuration. And lo and behold, this exception detail popped up.
>
> 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
> detail:
> java.lang.RuntimeException: Could not instantiate class
> *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
> ...
> at clojure.lang.Var.invoke(Var.java:383)
> at clojure.lang.AFn.applyToHelper(AFn.java:156)
> at clojure.lang.Var.applyTo(Var.java:700)
> at clojure.main.main(main.java:37)
>
>
>
> I fixed the typo (in bold red), and that solved the problem :)
>
> Cheers
>
> Tim
>
>
> On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey Tim,
> >
> > Can you include the full log? Since the producer is not showing up, I'd
> > expect to see some earlier messages explaining why. In particular, this
> > section of code from SamzaContainer.scala should print errors
> instantiating
> > the producers:
> >
> > val producers = systemFactories
> > >   .map {
> > > case (systemName, systemFactory) =>
> > >   try {
> > > (systemName, systemFactory.getProducer(systemName, config,
> > > samzaContainerMetrics.registry))
> > >   } catch {
> > > case e: Exception =>
> > >   info("Failed to create a producer for %s, so skipping."
> > > format systemName)
> > >   debug("Exception detail:", e)
> > >   (systemName, null)
> > >   }
> > >   }
> > >   .filter(_._2 != null)
> > >   .toMap
> > > info("Got system producers: %s" format producers.keys)
> >
> >
> > And from the looks of it, it would be a good idea to enable debug logging
> > at least for org.apache.samza.container, so we can see the exception
> > detail.
> >
> > Thanks,
> > Jake
> >
> > On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> > tim.washing...@fundingcircle.com> wrote:
> >
> > > *Tim Washington* | *Senior Software Engineer*
> > >
> > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
> > >
> > > 747 Front St, 4th Fl | San Francisco, CA 94111
> > >
> > > *Our Mission: **T**o build a better financial world*
> > >
> > >
> > >
> > >
> > > -- Forwarded message --
> > > From: Tim Washington <tim.washing...@fundingcircle.com>
> > > Date: 9 May 2016 at 15:40
> > > Subject: Howto Use The Elasticsearch Producer
> > > To: dev-subscr...@samza.apache.org
> > > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
> > > charles.re...@fundingcircle.com>
> > >
> > >
> > > Hey guys,
> > >
> > > I'm trying to use the Elasticsearch Producer, as described in this
> > message
> > > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> > > exception,
> > > trying to write out the message.
> > >
> > > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > > Entering run loop.
> > > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > > org.apache.samza.Partition, :partition-id 0}, :stream ledge

Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
Hey Jacob, right you are.

I added a "**"
logback configuration. And lo and behold, this exception detail popped up.

10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
detail:
java.lang.RuntimeException: Could not instantiate class
*org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
at
org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
...
at clojure.lang.Var.invoke(Var.java:383)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.Var.applyTo(Var.java:700)
at clojure.main.main(main.java:37)



I fixed the typo (in bold red), and that solved the problem :)

Cheers

Tim


On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote:

> Hey Tim,
>
> Can you include the full log? Since the producer is not showing up, I'd
> expect to see some earlier messages explaining why. In particular, this
> section of code from SamzaContainer.scala should print errors instantiating
> the producers:
>
> val producers = systemFactories
> >   .map {
> > case (systemName, systemFactory) =>
> >   try {
> > (systemName, systemFactory.getProducer(systemName, config,
> > samzaContainerMetrics.registry))
> >   } catch {
> > case e: Exception =>
> >   info("Failed to create a producer for %s, so skipping."
> > format systemName)
> >   debug("Exception detail:", e)
> >   (systemName, null)
> >   }
> >   }
> >   .filter(_._2 != null)
> >   .toMap
> > info("Got system producers: %s" format producers.keys)
>
>
> And from the looks of it, it would be a good idea to enable debug logging
> at least for org.apache.samza.container, so we can see the exception
> detail.
>
> Thanks,
> Jake
>
> On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> tim.washing...@fundingcircle.com> wrote:
>
> > *Tim Washington* | *Senior Software Engineer*
> >
> > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
> >
> > 747 Front St, 4th Fl | San Francisco, CA 94111
> >
> > *Our Mission: **T**o build a better financial world*
> >
> >
> >
> >
> > -- Forwarded message --
> > From: Tim Washington <tim.washing...@fundingcircle.com>
> > Date: 9 May 2016 at 15:40
> > Subject: Howto Use The Elasticsearch Producer
> > To: dev-subscr...@samza.apache.org
> > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
> > charles.re...@fundingcircle.com>
> >
> >
> > Hey guys,
> >
> > I'm trying to use the Elasticsearch Producer, as described in this
> message
> > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> > exception,
> > trying to write out the message.
> >
> > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > Entering run loop.
> > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> > kafka, :system-stream {:class org.apache.samza.system.SystemStream,
> :stream
> > ledger, :system kafka}}}
> > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer -
> Caught
> > exception in process loop.
> > *org.apache.samza.SamzaException: Attempting to produce to unknown
> system:
> > elasticsearch. Available systems: Set(kafka). Please add the system to
> your
> > configuration, or update outgoing message envelope to send to a defined
> > system.*
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > ...
> > at

Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Jacob Maes
Hey Tim,

Can you include the full log? Since the producer is not showing up, I'd
expect to see some earlier messages explaining why. In particular, this
section of code from SamzaContainer.scala should print errors instantiating
the producers:

val producers = systemFactories
>   .map {
> case (systemName, systemFactory) =>
>   try {
> (systemName, systemFactory.getProducer(systemName, config,
> samzaContainerMetrics.registry))
>   } catch {
> case e: Exception =>
>   info("Failed to create a producer for %s, so skipping."
> format systemName)
>   debug("Exception detail:", e)
>   (systemName, null)
>   }
>   }
>   .filter(_._2 != null)
>   .toMap
> info("Got system producers: %s" format producers.keys)


And from the looks of it, it would be a good idea to enable debug logging
at least for org.apache.samza.container, so we can see the exception
detail.

Thanks,
Jake

On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> *Tim Washington* | *Senior Software Engineer*
>
> *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
>
> 747 Front St, 4th Fl | San Francisco, CA 94111
>
> *Our Mission: **T**o build a better financial world*
>
>
>
>
> -- Forwarded message ------
> From: Tim Washington <tim.washing...@fundingcircle.com>
> Date: 9 May 2016 at 15:40
> Subject: Howto Use The Elasticsearch Producer
> To: dev-subscr...@samza.apache.org
> Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
> charles.re...@fundingcircle.com>
>
>
> Hey guys,
>
> I'm trying to use the Elasticsearch Producer, as described in this message
> <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> exception,
> trying to write out the message.
>
> 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> Entering run loop.
> 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> SystemStreamPartition [kafka, ledger, 0] is catched up.
> 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> org.apache.samza.container.TaskInstance$$anon$1@20904646]
> 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> org.apache.samza.system.SystemStreamPartition, :partition {:class
> org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
> ledger, :system kafka}}}
> 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
> exception in process loop.
> *org.apache.samza.SamzaException: Attempting to produce to unknown system:
> elasticsearch. Available systems: Set(kafka). Please add the system to your
> configuration, or update outgoing message envelope to send to a defined
> system.*
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> ...
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>
>
> But the source of
> <
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86
> >
> the stack trace line, tells me that the code can't find a *producer* in the
> elasticsearch system. The code that I'm using is straight out of the
> example.
>
> collector.send(new OutgoingMessageEnvelope(new
> SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
> ELASTICSEARCH_TYPE), parsedJsonObject));
>
>
> And if we print out the config, I'm indeed seeing *elasticsearch* as a
> system.
>
>
> 14:09:19.978 [main] INFO  ledger.run - ;;
> ;;  ___  __ _ _ __ ___  __ _
> ;; / __|/ _` | '_ ` _ \|_  / _` |
> ;; \__ \ (_| | | | | | |/ / (_| |
> ;; |___/\__,_|_| |_| |_/___\__,_|
> ;;
>
> 14:09:19.980 [main] INFO  ledger.run - ;; >>>>>>> ledger-elastic-connector
> <<<<<<<
> 14:09:19.980 [main] INFO  ledger.run - ;;
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.coordinator.replication.factor=1
> 14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.factory.class=org.apache.samza.job.local.ThreadJob

Fwd: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
*Tim Washington* | *Senior Software Engineer*

*tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*

747 Front St, 4th Fl | San Francisco, CA 94111

*Our Mission: **T**o build a better financial world*




-- Forwarded message --
From: Tim Washington <tim.washing...@fundingcircle.com>
Date: 9 May 2016 at 15:40
Subject: Howto Use The Elasticsearch Producer
To: dev-subscr...@samza.apache.org
Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
charles.re...@fundingcircle.com>


Hey guys,

I'm trying to use the Elasticsearch Producer, as described in this message
<https://reviews.apache.org/r/36768/diff/1#3>. But I run into an exception,
trying to write out the message.

15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
Entering run loop.
15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
SystemStreamPartition [kafka, ledger, 0] is catched up.
15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
#object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
org.apache.samza.container.TaskInstance$$anon$1@20904646]
15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
:qwerty, :offset 5, :size 0, :system-stream-partition {:class
org.apache.samza.system.SystemStreamPartition, :partition {:class
org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
ledger, :system kafka}}}
15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
exception in process loop.
*org.apache.samza.SamzaException: Attempting to produce to unknown system:
elasticsearch. Available systems: Set(kafka). Please add the system to your
configuration, or update outgoing message envelope to send to a defined
system.*
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
...
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)


But the source of
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86>
the stack trace line, tells me that the code can't find a *producer* in the
elasticsearch system. The code that I'm using is straight out of the
example.

collector.send(new OutgoingMessageEnvelope(new
SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
ELASTICSEARCH_TYPE), parsedJsonObject));


And if we print out the config, I'm indeed seeing *elasticsearch* as a
system.


14:09:19.978 [main] INFO  ledger.run - ;;
;;  ___  __ _ _ __ ___  __ _
;; / __|/ _` | '_ ` _ \|_  / _` |
;; \__ \ (_| | | | | | |/ / (_| |
;; |___/\__,_|_| |_| |_/___\__,_|
;;

14:09:19.980 [main] INFO  ledger.run - ;; >>>>>>> ledger-elastic-connector
<<<<<<<
14:09:19.980 [main] INFO  ledger.run - ;;
14:09:19.981 [main] INFO  ledger.run - ;;
job.coordinator.replication.factor=1
14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
14:09:19.981 [main] INFO  ledger.run - ;;
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
14:09:19.981 [main] INFO  ledger.run - ;; job.name=ledger-elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
samja.task.factory.function=ledger.elastic.job/elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
serializers.registry.edn.class=samja.core.EDNSerdeFactory
14:09:19.981 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.host=localhost
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.port=9300
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.producer.bootstrap.servers=192.168.99.100:9092
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.key.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.msg.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;;
task.checkpoint.factory=or