Re: Review Request 44920: SAMZA-680 Refactor the Samza AppMaster to support other cluster managers

2016-05-10 Thread Jagadish Venkatraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/
---

(Updated May 10, 2016, 11:50 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
(Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Repository: samza


Description
---

1.Proposed new APIs for running Samza without Yarn. (SAMZA-881)
   - Defined the ClusterResourceManager abstraction. This will abstract out 
Yarn and Mesos's interaction with Samza.
   - Defined the SamzaResource, SamzaResourceRequest. 
   - Re-wrote the SamzaAppMaster logic into a ClusterBasedJobCoordinator.
2.Defined a ClusterManagerConfig to handle configurations independent of 
Yarn/Mesos.
3.Made Samza completely independent of Yarn. This cleanly separates Samza 
specific components and Yarn
specific components.
4.Readability improvements to the existing code base.
   -Added explicity documentation for every method, member and class (including 
on thread-safety)
   - Made internal variables final to document intent, visibility across 
threads. (trivially by adding modifiers, or by changing where they're 
initialized.)
5.Refactored JobCoordinator into a JobModelReader.

== Diff2 ==
Address Chriss review feedback.

Design Doc: 
https://issues.apache.org/jira/secure/attachment/12800342/Samza%20JobCoordinator%20Re-design%20Proposal_1.pdf

== Diff 3 ==
Address Yi's feedback

== Diff 4 ==
Sync with current master


Diffs (updated)
-

  checkstyle/import-control.xml fad7b55a540a2f9886a3bfdf7631d9661b602d29 
  
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
a3281c2c5f481a78cc4ae791c77d0e9805202477 
  
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
0324e90a20c2fd31d1b7c6a0707aa3685a1cec20 
  samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
31b208f47b12a4e9ef1134b1c0bfe532f6c07a80 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 
  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala 
95d01dd15d369dcb2255154163bc2cd50d7b84e0 
  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
81ef59a8e2fd2745aa37a65074400b64c406bc28 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
5acfe875d3a3d9842497e646f0f04ea2861ae950 
  
samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
 PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
 PRE-CREATION 
  

Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-10 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
---

Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948


Repository: samza


Description
---

SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

See the stack traces in the JIRA for more context. Essentially the consumer can 
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, 
etc) and with the remove() logic that was added in SAMZA-913, we can get 
ConcurrentModificationExceptions. 

Fix:
* Use an AtomicReference to swap in the updated messages when they are ready 
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock

Also sneaking in a log message fix for JobCoordinator. It previously didn't 
include the task names.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 8e1057b4d055159acb49d2cc60d3acad7665a532 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 

Diff: https://reviews.apache.org/r/47197/diff/


Testing
---

./gradlew build


Thanks,

Jake Maes



Re: Kafka dependency

2016-05-10 Thread Yi Pan
Hi, Nick,

Good point. Sorry I forgot to push the latest tag out to the remote repo.
Here is the tag that used to publish all binary tar balls:
release-0.10.0-rc2

Thanks!

-Yi

On Tue, May 10, 2016 at 11:24 AM, Nick Quinn  wrote:

> Thanks Yi!
>
> One more question, if I may:  I saw that you dropped a 0.10.0 in December
> of 2015, but the version in Github is still 0.10.0-rc. Do you guys plan on
> an official release of 0.10.0 or are you settling for a simple rc release?
> The reason I ask is that it is hard to match the git commits to the release
> when there is no official 0.10.0 release.
>
> Thanks!
> Nick
>
>
> -Original Message-
> From: Yi Pan [mailto:nickpa...@gmail.com]
> Sent: Tuesday, May 10, 2016 11:22 AM
> To: dev@samza.apache.org
> Subject: Re: Kafka dependency
>
> Hi, Nick,
>
> We do have plan to update the Kafka dependency in Samza. However, Samza
> only uses Kafka client library. We have confirmed that any Kafka 0.8.2
> clients should be supported by Kafka 0.9 brokers. Hence, it should not
> block you if you are thinking of upgrading Kafka broker versions (e.g.
> LinkedIn has been running with this combination for a long time). The main
> reason that we are taking a bit careful path on the upgrading of Kafka
> client version is that there are Samza users who are still running Kafka
> 0.8.2 broker version and if we force upgrade the client version to 0.9, it
> would likely not be supported by a lower version of Kafka broker.
>
> If you have any specific use case that requires Kafka client version 0.9
> and above, please speak up. We will put into consideration in our upgrade
> plan and timeline.
>
> Thanks a lot!
>
> -Yi
>
> On Tue, May 10, 2016 at 10:28 AM, Nick Quinn 
> wrote:
>
> > Hi guys-
> >
> >
> >
> > I was wondering why Samza still has a dependency on Kafka 0.8.2. Does
> > your development team have any plans to update the Kafka dependency
> > version that Samza is using?
> >
> > Best,
> >
> > Nick
> >
> >
>


Re: Kafka dependency

2016-05-10 Thread Robert Crim
Just want to chime in to add we'd like to use some of the new security
features in Kafka 0.9, ACLs and TLS in particular.

On Tue, May 10, 2016 at 11:24 AM, Nick Quinn  wrote:

> Thanks Yi!
>
> One more question, if I may:  I saw that you dropped a 0.10.0 in December
> of 2015, but the version in Github is still 0.10.0-rc. Do you guys plan on
> an official release of 0.10.0 or are you settling for a simple rc release?
> The reason I ask is that it is hard to match the git commits to the release
> when there is no official 0.10.0 release.
>
> Thanks!
> Nick
>
>
> -Original Message-
> From: Yi Pan [mailto:nickpa...@gmail.com]
> Sent: Tuesday, May 10, 2016 11:22 AM
> To: dev@samza.apache.org
> Subject: Re: Kafka dependency
>
> Hi, Nick,
>
> We do have plan to update the Kafka dependency in Samza. However, Samza
> only uses Kafka client library. We have confirmed that any Kafka 0.8.2
> clients should be supported by Kafka 0.9 brokers. Hence, it should not
> block you if you are thinking of upgrading Kafka broker versions (e.g.
> LinkedIn has been running with this combination for a long time). The main
> reason that we are taking a bit careful path on the upgrading of Kafka
> client version is that there are Samza users who are still running Kafka
> 0.8.2 broker version and if we force upgrade the client version to 0.9, it
> would likely not be supported by a lower version of Kafka broker.
>
> If you have any specific use case that requires Kafka client version 0.9
> and above, please speak up. We will put into consideration in our upgrade
> plan and timeline.
>
> Thanks a lot!
>
> -Yi
>
> On Tue, May 10, 2016 at 10:28 AM, Nick Quinn 
> wrote:
>
> > Hi guys-
> >
> >
> >
> > I was wondering why Samza still has a dependency on Kafka 0.8.2. Does
> > your development team have any plans to update the Kafka dependency
> > version that Samza is using?
> >
> > Best,
> >
> > Nick
> >
> >
>


RE: Kafka dependency

2016-05-10 Thread Nick Quinn
Thanks Yi! 

One more question, if I may:  I saw that you dropped a 0.10.0 in December of 
2015, but the version in Github is still 0.10.0-rc. Do you guys plan on an 
official release of 0.10.0 or are you settling for a simple rc release? The 
reason I ask is that it is hard to match the git commits to the release when 
there is no official 0.10.0 release.

Thanks!
Nick


-Original Message-
From: Yi Pan [mailto:nickpa...@gmail.com] 
Sent: Tuesday, May 10, 2016 11:22 AM
To: dev@samza.apache.org
Subject: Re: Kafka dependency

Hi, Nick,

We do have plan to update the Kafka dependency in Samza. However, Samza only 
uses Kafka client library. We have confirmed that any Kafka 0.8.2 clients 
should be supported by Kafka 0.9 brokers. Hence, it should not block you if you 
are thinking of upgrading Kafka broker versions (e.g.
LinkedIn has been running with this combination for a long time). The main 
reason that we are taking a bit careful path on the upgrading of Kafka client 
version is that there are Samza users who are still running Kafka
0.8.2 broker version and if we force upgrade the client version to 0.9, it 
would likely not be supported by a lower version of Kafka broker.

If you have any specific use case that requires Kafka client version 0.9 and 
above, please speak up. We will put into consideration in our upgrade plan and 
timeline.

Thanks a lot!

-Yi

On Tue, May 10, 2016 at 10:28 AM, Nick Quinn  wrote:

> Hi guys-
>
>
>
> I was wondering why Samza still has a dependency on Kafka 0.8.2. Does 
> your development team have any plans to update the Kafka dependency 
> version that Samza is using?
>
> Best,
>
> Nick
>
>


Re: Kafka dependency

2016-05-10 Thread Yi Pan
Hi, Nick,

We do have plan to update the Kafka dependency in Samza. However, Samza
only uses Kafka client library. We have confirmed that any Kafka 0.8.2
clients should be supported by Kafka 0.9 brokers. Hence, it should not
block you if you are thinking of upgrading Kafka broker versions (e.g.
LinkedIn has been running with this combination for a long time). The main
reason that we are taking a bit careful path on the upgrading of Kafka
client version is that there are Samza users who are still running Kafka
0.8.2 broker version and if we force upgrade the client version to 0.9, it
would likely not be supported by a lower version of Kafka broker.

If you have any specific use case that requires Kafka client version 0.9
and above, please speak up. We will put into consideration in our upgrade
plan and timeline.

Thanks a lot!

-Yi

On Tue, May 10, 2016 at 10:28 AM, Nick Quinn  wrote:

> Hi guys-
>
>
>
> I was wondering why Samza still has a dependency on Kafka 0.8.2. Does your
> development team have any plans to update the Kafka dependency version that
> Samza is using?
>
> Best,
>
> Nick
>
>


Kafka dependency

2016-05-10 Thread Nick Quinn
Hi guys-



I was wondering why Samza still has a dependency on Kafka 0.8.2. Does your 
development team have any plans to update the Kafka dependency version that 
Samza is using?

Best,

Nick



Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Jacob Maes
Glad to hear it!

I'll add a JIRA task to always log the exception. We shouldn't need to
enable debug logging to fix issues like this.

On Tue, May 10, 2016 at 10:16 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> Hey Jacob, right you are.
>
> I added a "**"
> logback configuration. And lo and behold, this exception detail popped up.
>
> 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
> detail:
> java.lang.RuntimeException: Could not instantiate class
> *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
> ...
> at clojure.lang.Var.invoke(Var.java:383)
> at clojure.lang.AFn.applyToHelper(AFn.java:156)
> at clojure.lang.Var.applyTo(Var.java:700)
> at clojure.main.main(main.java:37)
>
>
>
> I fixed the typo (in bold red), and that solved the problem :)
>
> Cheers
>
> Tim
>
>
> On 10 May 2016 at 09:08, Jacob Maes  wrote:
>
> > Hey Tim,
> >
> > Can you include the full log? Since the producer is not showing up, I'd
> > expect to see some earlier messages explaining why. In particular, this
> > section of code from SamzaContainer.scala should print errors
> instantiating
> > the producers:
> >
> > val producers = systemFactories
> > >   .map {
> > > case (systemName, systemFactory) =>
> > >   try {
> > > (systemName, systemFactory.getProducer(systemName, config,
> > > samzaContainerMetrics.registry))
> > >   } catch {
> > > case e: Exception =>
> > >   info("Failed to create a producer for %s, so skipping."
> > > format systemName)
> > >   debug("Exception detail:", e)
> > >   (systemName, null)
> > >   }
> > >   }
> > >   .filter(_._2 != null)
> > >   .toMap
> > > info("Got system producers: %s" format producers.keys)
> >
> >
> > And from the looks of it, it would be a good idea to enable debug logging
> > at least for org.apache.samza.container, so we can see the exception
> > detail.
> >
> > Thanks,
> > Jake
> >
> > On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> > tim.washing...@fundingcircle.com> wrote:
> >
> > > *Tim Washington* | *Senior Software Engineer*
> > >
> > > *tim.washing...@fundingcircle.com *
> > >
> > > 747 Front St, 4th Fl | San Francisco, CA 94111
> > >
> > > *Our Mission: **T**o build a better financial world*
> > >
> > >
> > >
> > >
> > > -- Forwarded message --
> > > From: Tim Washington 
> > > Date: 9 May 2016 at 15:40
> > > Subject: Howto Use The Elasticsearch Producer
> > > To: dev-subscr...@samza.apache.org
> > > Cc: Andy Chambers , Charles Reese <
> > > charles.re...@fundingcircle.com>
> > >
> > >
> > > Hey guys,
> > >
> > > I'm trying to use the Elasticsearch Producer, as described in this
> > message
> > > . But I run into an
> > > exception,
> > > trying to write out the message.
> > >
> > > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > > Entering run loop.
> > > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> > > kafka, :system-stream {:class org.apache.samza.system.SystemStream,
> > :stream
> > > ledger, :system kafka}}}
> > > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer -
> > Caught
> > > exception in process loop.
> > > *org.apache.samza.SamzaException: Attempting to produce to unknown
> > system:
> > > elasticsearch. Available systems: Set(kafka). Please add the system to
> > your
> > > configuration, or update outgoing message envelope to send to a defined
> > > system.*
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > > ...
> > > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> > > at 

Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
Hey Jacob, right you are.

I added a "**"
logback configuration. And lo and behold, this exception detail popped up.

10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
detail:
java.lang.RuntimeException: Could not instantiate class
*org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
at
org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
...
at clojure.lang.Var.invoke(Var.java:383)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.Var.applyTo(Var.java:700)
at clojure.main.main(main.java:37)



I fixed the typo (in bold red), and that solved the problem :)

Cheers

Tim


On 10 May 2016 at 09:08, Jacob Maes  wrote:

> Hey Tim,
>
> Can you include the full log? Since the producer is not showing up, I'd
> expect to see some earlier messages explaining why. In particular, this
> section of code from SamzaContainer.scala should print errors instantiating
> the producers:
>
> val producers = systemFactories
> >   .map {
> > case (systemName, systemFactory) =>
> >   try {
> > (systemName, systemFactory.getProducer(systemName, config,
> > samzaContainerMetrics.registry))
> >   } catch {
> > case e: Exception =>
> >   info("Failed to create a producer for %s, so skipping."
> > format systemName)
> >   debug("Exception detail:", e)
> >   (systemName, null)
> >   }
> >   }
> >   .filter(_._2 != null)
> >   .toMap
> > info("Got system producers: %s" format producers.keys)
>
>
> And from the looks of it, it would be a good idea to enable debug logging
> at least for org.apache.samza.container, so we can see the exception
> detail.
>
> Thanks,
> Jake
>
> On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> tim.washing...@fundingcircle.com> wrote:
>
> > *Tim Washington* | *Senior Software Engineer*
> >
> > *tim.washing...@fundingcircle.com *
> >
> > 747 Front St, 4th Fl | San Francisco, CA 94111
> >
> > *Our Mission: **T**o build a better financial world*
> >
> >
> >
> >
> > -- Forwarded message --
> > From: Tim Washington 
> > Date: 9 May 2016 at 15:40
> > Subject: Howto Use The Elasticsearch Producer
> > To: dev-subscr...@samza.apache.org
> > Cc: Andy Chambers , Charles Reese <
> > charles.re...@fundingcircle.com>
> >
> >
> > Hey guys,
> >
> > I'm trying to use the Elasticsearch Producer, as described in this
> message
> > . But I run into an
> > exception,
> > trying to write out the message.
> >
> > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > Entering run loop.
> > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> > kafka, :system-stream {:class org.apache.samza.system.SystemStream,
> :stream
> > ledger, :system kafka}}}
> > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer -
> Caught
> > exception in process loop.
> > *org.apache.samza.SamzaException: Attempting to produce to unknown
> system:
> > elasticsearch. Available systems: Set(kafka). Please add the system to
> your
> > configuration, or update outgoing message envelope to send to a defined
> > system.*
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > ...
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> > at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> >
> >
> > But the source of
> > <
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86
> > >
> > the stack trace line, tells me that the code can't find a *producer* in
> the
> > elasticsearch system. The code that I'm using is straight out of the
> > example.
> >
> > collector.send(new OutgoingMessageEnvelope(new
> > SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
> > 

Re: Review Request 46287: Add a double serde.

2016-05-10 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/46287/#review132481
---


Ship it!




+1. lgtm!

- Yi Pan (Data Infrastructure)


On April 15, 2016, 11:17 p.m., Jon Bringhurst wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/46287/
> ---
> 
> (Updated April 15, 2016, 11:17 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-936
> https://issues.apache.org/jira/browse/SAMZA-936
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Add a simple double serde.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala 
> PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/46287/diff/
> 
> 
> Testing
> ---
> 
> A simple unit test was added.
> 
> 
> Thanks,
> 
> Jon Bringhurst
> 
>



Re: Howto Use The Elasticsearch Producer

2016-05-10 Thread Jacob Maes
Hey Tim,

Can you include the full log? Since the producer is not showing up, I'd
expect to see some earlier messages explaining why. In particular, this
section of code from SamzaContainer.scala should print errors instantiating
the producers:

val producers = systemFactories
>   .map {
> case (systemName, systemFactory) =>
>   try {
> (systemName, systemFactory.getProducer(systemName, config,
> samzaContainerMetrics.registry))
>   } catch {
> case e: Exception =>
>   info("Failed to create a producer for %s, so skipping."
> format systemName)
>   debug("Exception detail:", e)
>   (systemName, null)
>   }
>   }
>   .filter(_._2 != null)
>   .toMap
> info("Got system producers: %s" format producers.keys)


And from the looks of it, it would be a good idea to enable debug logging
at least for org.apache.samza.container, so we can see the exception
detail.

Thanks,
Jake

On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> *Tim Washington* | *Senior Software Engineer*
>
> *tim.washing...@fundingcircle.com *
>
> 747 Front St, 4th Fl | San Francisco, CA 94111
>
> *Our Mission: **T**o build a better financial world*
>
>
>
>
> -- Forwarded message --
> From: Tim Washington 
> Date: 9 May 2016 at 15:40
> Subject: Howto Use The Elasticsearch Producer
> To: dev-subscr...@samza.apache.org
> Cc: Andy Chambers , Charles Reese <
> charles.re...@fundingcircle.com>
>
>
> Hey guys,
>
> I'm trying to use the Elasticsearch Producer, as described in this message
> . But I run into an
> exception,
> trying to write out the message.
>
> 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> Entering run loop.
> 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> SystemStreamPartition [kafka, ledger, 0] is catched up.
> 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> org.apache.samza.container.TaskInstance$$anon$1@20904646]
> 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> org.apache.samza.system.SystemStreamPartition, :partition {:class
> org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
> ledger, :system kafka}}}
> 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
> exception in process loop.
> *org.apache.samza.SamzaException: Attempting to produce to unknown system:
> elasticsearch. Available systems: Set(kafka). Please add the system to your
> configuration, or update outgoing message envelope to send to a defined
> system.*
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> ...
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>
>
> But the source of
> <
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86
> >
> the stack trace line, tells me that the code can't find a *producer* in the
> elasticsearch system. The code that I'm using is straight out of the
> example.
>
> collector.send(new OutgoingMessageEnvelope(new
> SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
> ELASTICSEARCH_TYPE), parsedJsonObject));
>
>
> And if we print out the config, I'm indeed seeing *elasticsearch* as a
> system.
>
>
> 14:09:19.978 [main] INFO  ledger.run - ;;
> ;;  ___  __ _ _ __ ___  __ _
> ;; / __|/ _` | '_ ` _ \|_  / _` |
> ;; \__ \ (_| | | | | | |/ / (_| |
> ;; |___/\__,_|_| |_| |_/___\__,_|
> ;;
>
> 14:09:19.980 [main] INFO  ledger.run - ;; >>> ledger-elastic-connector
> <<<
> 14:09:19.980 [main] INFO  ledger.run - ;;
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.coordinator.replication.factor=1
> 14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> 14:09:19.981 [main] INFO  ledger.run - ;; job.name
> =ledger-elastic-connector
> 14:09:19.981 [main] INFO  ledger.run - ;;
> samja.task.factory.function=ledger.elastic.job/elastic-connector
> 14:09:19.981 [main] INFO  ledger.run - ;;
> serializers.registry.edn.class=samja.core.EDNSerdeFactory
> 14:09:19.981 [main] INFO  ledger.run - ;;
> systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> 

Fwd: Howto Use The Elasticsearch Producer

2016-05-10 Thread Tim Washington
*Tim Washington* | *Senior Software Engineer*

*tim.washing...@fundingcircle.com *

747 Front St, 4th Fl | San Francisco, CA 94111

*Our Mission: **T**o build a better financial world*




-- Forwarded message --
From: Tim Washington 
Date: 9 May 2016 at 15:40
Subject: Howto Use The Elasticsearch Producer
To: dev-subscr...@samza.apache.org
Cc: Andy Chambers , Charles Reese <
charles.re...@fundingcircle.com>


Hey guys,

I'm trying to use the Elasticsearch Producer, as described in this message
. But I run into an exception,
trying to write out the message.

15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
Entering run loop.
15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
SystemStreamPartition [kafka, ledger, 0] is catched up.
15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
#object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
org.apache.samza.container.TaskInstance$$anon$1@20904646]
15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
:qwerty, :offset 5, :size 0, :system-stream-partition {:class
org.apache.samza.system.SystemStreamPartition, :partition {:class
org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
ledger, :system kafka}}}
15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
exception in process loop.
*org.apache.samza.SamzaException: Attempting to produce to unknown system:
elasticsearch. Available systems: Set(kafka). Please add the system to your
configuration, or update outgoing message envelope to send to a defined
system.*
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
at
org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
...
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)


But the source of

the stack trace line, tells me that the code can't find a *producer* in the
elasticsearch system. The code that I'm using is straight out of the
example.

collector.send(new OutgoingMessageEnvelope(new
SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
ELASTICSEARCH_TYPE), parsedJsonObject));


And if we print out the config, I'm indeed seeing *elasticsearch* as a
system.


14:09:19.978 [main] INFO  ledger.run - ;;
;;  ___  __ _ _ __ ___  __ _
;; / __|/ _` | '_ ` _ \|_  / _` |
;; \__ \ (_| | | | | | |/ / (_| |
;; |___/\__,_|_| |_| |_/___\__,_|
;;

14:09:19.980 [main] INFO  ledger.run - ;; >>> ledger-elastic-connector
<<<
14:09:19.980 [main] INFO  ledger.run - ;;
14:09:19.981 [main] INFO  ledger.run - ;;
job.coordinator.replication.factor=1
14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
14:09:19.981 [main] INFO  ledger.run - ;;
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
14:09:19.981 [main] INFO  ledger.run - ;; job.name=ledger-elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
samja.task.factory.function=ledger.elastic.job/elastic-connector
14:09:19.981 [main] INFO  ledger.run - ;;
serializers.registry.edn.class=samja.core.EDNSerdeFactory
14:09:19.981 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.host=localhost
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.client.transport.port=9300
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.producer.bootstrap.servers=192.168.99.100:9092
14:09:19.982 [main] INFO  ledger.run - ;;
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.key.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.msg.serde=edn
14:09:19.982 [main] INFO  ledger.run - ;;
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
14:09:19.982 [main] INFO  ledger.run - ;;