Re: [Discuss] Moving Samza to Java 1.8 source compatibility.

2016-04-28 Thread Roger Hoover
+1 for me.  We're already using Java 8 in PRD.

On Thu, Apr 28, 2016 at 10:45 AM, Yi Pan  wrote:

> I am +1 on the JDK8 move. As Jake has elaborated, there are numerous
> advantages from 1.8 source compatible code.
>
> As for the downside of dropping JDK7 support, obviously, bin
> backward-compatibility will be broken. However, moving to JDK8 binary is
> not a big effort for JDK7-compatible Java and Scala source code, in term of
> compiling and packaging. There is no need for source code change and we
> have been building JDK8 binary in LinkedIn and running in production w/
> JDK8 for a long time w/o seeing any issues.
>
> For users cannot upgrade their runtime JVM version to JDK8 easily, the
> latest coming release will still be on JDK7. Question is: how long should
> we hold back in waiting for this upgrade?
>
> Thanks!
>
> -Yi
>
> On Wed, Apr 27, 2016 at 6:27 PM, Jacob Maes  wrote:
>
> > Hey everyone,
> >
> > I wanted to start a discussion to see what folks think about moving to
> Java
> > 1.8 source compatibility at some point after the 10.1 release.
> >
> > Java 8 has a number of nice features that can help us build more concise,
> > maintainable, and robust software. A few notable features that would
> > benefit Samza:
> > 1. Stream API - provide a compact syntax for expressing transformations
> on
> > collections. These may be foundational for future API work including
> > Operators (SAMZA-914)
> > 2. Default Methods - enable us to evolve interfaces without breaking
> > compatibility
> > 3. Concurrent package enhancements - generally make concurrent
> programming
> > easier, which will be more important with features like multithreading
> > support (SAMZA-863)
> > 4. Lambdas - love them or hate them, they do reduce the amount of
> > boilerplate code, especially when used in place of anonymous classes.
> >
> > It certainly would be nice to leverage some of the features above.
> However,
> > we have historically supported Java versions N and N-1 and it doesn't
> look
> > like Java 9 is coming until next year. So, discontinuing support for Java
> > 1.7 at this point would be a departure from our normal support matrix
> for a
> > significant period of time. Thoughts on the pros and cons?
> >
> > I know some folks in this community are still on Java 1.7. How many of
> you
> > stay up to date with the latest Samza? Do you have a roadmap to move to
> > Java 1.8?
> >
> > Thanks,
> > Jake
> >
>


Re: ThreadJobFactory in production

2016-03-02 Thread Roger Hoover
Jose,

It would be great if you could share it.  I'm interested in trying to use
it as well.

Thanks,

Roger

On Wed, Mar 2, 2016 at 2:31 PM, José Barrueta  wrote:

> Hi guys,
>
> At Stormpath, we made a custom samza 10 version merging SAMZA-41 into it,
> it's working well, so we are thinking to update that patch later this week
> so it can be added to the main project.
>
> HTH,
>
> Jose Luis Barrueta
>
> On Wed, Mar 2, 2016 at 2:11 PM, Yi Pan  wrote:
>
> > Hi, Robert,
> >
> > The main reason that ThreadJobFactory and ProcessJobFactory are not
> > considered "production-ready" is that there is only one container for the
> > job and all tasks are assigned to the single container. Hence, it is not
> > easy to scale out of a single host.
> >
> > As Rick mentioned, Netflix has put up a patch in SAMZA-41 based on 0.9.1
> o
> > allow static assignment of a subset of partitions to a single ProcessJob,
> > which allows to launch multiple ProcessJobs in different hosts. We
> planned
> > to merge it to 0.10. But it turns out that too much changes have gone
> into
> > 0.10 and it became difficult to merge the patch. At this point, we can
> > still try the following two options:
> > 1) We can attempt to merge SAMZA-41 to 0.10.1 again, it may take some
> > effort but would give a stop-gap solution.
> > 2) We are working on a standalone Samza model (SAMZA-516, SAMZA-881) to
> > allow users to run Samza w/o depending on YarnJobFactory. This is a
> > long-term effort and will take some time to flesh out. Please join the
> > discussion there s.t. we can be more aligned in our effort.
> >
> > Hope the above gives you an overall picture on where we are going.
> >
> > Thanks a lot!
> >
> > -Yi
> >
> > On Wed, Mar 2, 2016 at 1:28 PM, Rick Mangi  wrote:
> >
> > > There was an interesting thread a while back from I believe the netflix
> > > guys about running ThreadJobFactory in production.
> > >
> > >
> > > > On Mar 2, 2016, at 4:20 PM, Robert Crim  wrote:
> > > >
> > > > Hi,
> > > >
> > > > We're currently working on a solution that allows us to run Samza
> jobs
> > on
> > > > Mesos. This seems to be going well, and something we'd like to move
> > away
> > > > from when native Mesos support is added to Samza.
> > > >
> > > > While we're developing and testing our scheduler, I'm wondering about
> > the
> > > > implications of running tasks with the ThreadJobFactory in
> > "production".
> > > > The documentation advise against this, but it's not clear why.
> > > >
> > > > If we were using the ThreadJobFactory inside of a docker container on
> > > Mesos
> > > > with Marathon for production, would be our main problem? These are
> not
> > > > particularly high-load tasks. Aside from not be able to get
> > find-grained
> > > > resource scheduling per-task, it seems like the main issue the not
> > being
> > > to
> > > > easily tell when a job stops due to error / exception.
> > > >
> > > > In other words, what would be stop-stopping reasons to not use the
> > > > TreadJobFactory in production?
> > > >
> > > > Thanks,
> > > > Rob
> > >
> > >
> >
>


Re: [DISCUSS] Moving to github/pull-request for code review and check-in

2016-02-18 Thread Roger Hoover
+1 - Thanks for bringing this up, Yi.  I've done it both ways and feel pull 
requests are much easier.

Sent from my iPhone

> On Feb 18, 2016, at 4:25 PM, Navina Ramesh  
> wrote:
> 
> +1
> 
> Haven't tried any contribution with pull requests. But sounds simpler than
> attaching the patch to JIRA.
> 
> Navina
> 
>> On Thu, Feb 18, 2016 at 4:01 PM, Jacob Maes  wrote:
>> 
>> +1
>> 
>> As a relatively new contributor to Samza, I've certainly felt the current
>> process was overly-complicated.
>> 
>>> On Thu, Feb 18, 2016 at 3:53 PM, Yi Pan  wrote:
>>> 
>>> Hi, all,
>>> 
>>> I want to start the discussion on our code review/commit process.
>>> 
>>> I felt that our code review and check-in process is a little bit
>>> cumbersome:
>>> - developers need to create RBs and attach diff to JIRA
>>> - committers need to review RBs, dowload diff and apply, then push.
>>> 
>>> It would be much lighter if we take the pull request only approach, as
>>> Kafka already converted to:
>>> - for the developers, the only thing needed is to open a pull request.
>>> - for committers, review and apply patch is from the same PR and merge
>> can
>>> be done directly on remote git repo.
>>> 
>>> Of course, there might be some hookup scripts that we will need to link
>>> JIRA w/ pull request in github, which Kafka already does. Any comments
>> and
>>> feedbacks are welcome!
>>> 
>>> Thanks!
>>> 
>>> -Yi
> 
> 
> 
> -- 
> Navina R.


Re: ElasticsearchSystemProducer Crashes Samza Job

2016-02-16 Thread Roger Hoover
The code that you showed below is part of the BulkProcessor.Listener
interface so if that listener were pluggable, you could override the
default behavior (which is to only ignore version conflicts).



On Tue, Feb 16, 2016 at 12:27 PM, jeremiah adams 
wrote:

> The root of the issue may be in the HTTP status code handling. This code
> seems to imply that the only valid error case from Elasticsearch is
> conflict. This is too narrow of a constraint. In one of my use cases, a
> mapping/message conflict occurs resulting in an HTTP 400. In my case, it is
> perfectly reasonable to log the error, not raise hasFatalError = true and
> continue processing.  A way to control this via properties or some other
> mechanism would probably solve the issue. Setting the hasFatalError flag
> looks to be the source of the unhandled exception that ultimately fails the
> job.
>
> I don't think the ListenerCallback will solve the problem. I don't
> understand how that might stop the unhandled exception being raised by
> flush().
>
>   if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
>  LOGGER.info("Failed to index document in Elasticsearch: " +
> itemResp.getFailureMessage());
>} else {
>  hasFatalError = true;
>  LOGGER.error("Failed to index document in Elasticsearch: " +
> itemResp.getFailureMessage());
>}
>
> - jeremiah
>
> On Tue, Feb 16, 2016 at 12:18 PM, Roger Hoover 
> wrote:
>
> > Hi Jeremiah,
> >
> > There's currently no way to do that.  I think the best way to modify the
> > existing ElasticsearchSystemProducer would be to add a config option for
> a
> > callback to let you customize this behavior.  Basically, a pluggable
> > listener (
> >
> >
> https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
> > ).
> >
> >
> >
> > On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams 
> > wrote:
> >
> > > We have a samza job configured to run in a yarn cluster. This job
> > consumes
> > > multiple kafka topics and routes the messages to elasticsearch for
> > > indexing. When enough batch-updates to elasticsearch fail using the
> > > ElasticsearchSystemProducer, the entire samza job dies. Due to
> > > checkpointing + yarn, the job starts backup, starts reading where it
> left
> > > off and dies again. Enter loop.
> > >
> > > Updates to ES are failing due to invalid data on the part of our
> > consumers
> > > but I can't aways control them so need to be defensive about the code.
> I
> > > don't see how to handle this in any of the source examples. I would
> like
> > to
> > > just trap this error and if it is what I expect it to be - squash it.
> Can
> > > someone point me in the right direction?
> > >
> > > Below is the log where the failure occurs.
> > >
> > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> > > message from TaskName-Partition 5 to system elastic.
> > > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> > > exception in thread (name=main). Exiting process now.
> > > org.apache.samza.SamzaException: Unable to send message from
> > > TaskName-Partition 5 to system elastic.
> > > at
> > >
> > >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> > > at
> > >
> > >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > at
> > >
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > > at
> org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> > > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> > > at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000
> ms
> > > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> > > exiting
> > >
> > > - jeremiah
> > >
> >
>


Re: ElasticsearchSystemProducer Crashes Samza Job

2016-02-16 Thread Roger Hoover
Hi Jeremiah,

There's currently no way to do that.  I think the best way to modify the
existing ElasticsearchSystemProducer would be to add a config option for a
callback to let you customize this behavior.  Basically, a pluggable
listener (
https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
).



On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams  wrote:

> We have a samza job configured to run in a yarn cluster. This job consumes
> multiple kafka topics and routes the messages to elasticsearch for
> indexing. When enough batch-updates to elasticsearch fail using the
> ElasticsearchSystemProducer, the entire samza job dies. Due to
> checkpointing + yarn, the job starts backup, starts reading where it left
> off and dies again. Enter loop.
>
> Updates to ES are failing due to invalid data on the part of our consumers
> but I can't aways control them so need to be defensive about the code. I
> don't see how to handle this in any of the source examples. I would like to
> just trap this error and if it is what I expect it to be - squash it. Can
> someone point me in the right direction?
>
> Below is the log where the failure occurs.
>
> 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> message from TaskName-Partition 5 to system elastic.
> 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> exception in thread (name=main). Exiting process now.
> org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 5 to system elastic.
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> at
>
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms
> 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> exiting
>
> - jeremiah
>


Re: HTTP-based Elasticsearch system producer and reusable task

2016-02-10 Thread Roger Hoover
Hi Yi,

Please see my comment inline.

On Tue, Feb 9, 2016 at 10:08 PM, Yi Pan  wrote:

> Hi, Roger,
>
> Got it! I would like to understand more on the SystemProducer API changes
> required by #1 and #2. Could you elaborate a bit more?
>
>
For #1, IndexRequestFactory interface (
https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java#L32)
returns an object of type (org.elasticsearch.action.index.IndexRequest)
which comes from the elasticsearch jar (
https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
).

For #2, the way to support Updates and Deletes would be to have the
"IndexRequestFactory" return an more generic "ActionRequest" which could be
an index, update, or delete action.  (in which case it should be called
ActionRequestFactory).




> Regarding to JDK8 required in the new HTTP-based Elasticsearch producer, I
> want to ask how you are motivated to go w/ JDK8. It does bring a lot more
> nice features. If we deprecate source-level compatibility to JDK7, we can
> benefit from a lot of new features from JDK8, like lambda, stream APIs,
> etc. And refactor Scala code to JDK8 is also much easier.
>
>
I really like that functions are first-class citizens in java 8 and stream
api is quite helpful as well.

For example, first-class functions helped me avoid duplicate code that
would have occurred because JEST doesn't expose a common ancestor type for
each type of builder (index, update, delete).  Passing in functions instead
of an objects with a common ancestor type solved the problem.

https://github.com/quantiply/rico/blob/master/samza-elasticsearch/src/main/java/com/quantiply/elasticsearch/HTTPBulkLoader.java#L237-L272


> Thanks!
>
> -Yi
>
> On Tue, Feb 9, 2016 at 4:19 PM, Roger Hoover 
> wrote:
>
> > Hi Yi,
> >
> > It could be merged into the Samza project if there's enough interest but
> > may need some re-working depending on which dependencies are ok to bring
> > in.  I did it outside of the Samza project first because I had to get it
> > done quickly so it relies on Java 8 features, dropwizard metrics for
> > histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which
> > itself drags in more dependencies (Guava, Gson, commons http).
> >
> > There are few issues with the existing ElasticsearchSystemProducer:
> >
> >1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch
> >Java API (a bulky dependency)
> >2. It only supports index requests.  I needed to also support updates
> >and deletes.
> >3. There currently no plugin mechanism to register a flush listener.
> >The reason I needed that was to be able to report end to end latency
> > stats
> >(total pipeline latency = commit time - event time).
> >
> > #3 is easily solvable with a additional plugin options. #1 and #2 require
> > changing the system producer API.
> >
> > Roger
> >
> > On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan  wrote:
> >
> > > Hi, Roger,
> > >
> > > That's awesome! Are you planning to submit the HTTP-based system
> producer
> > > in Samza open-source samza-elasticsearch module? If ElasticSearch
> > community
> > > suggest that HTTP-based clients be the recommended way, we should use
> it
> > in
> > > samza-elasticsearch as well. And what's your opinion on the existing
> > > ElasticsearchSystemProducer? If the SystemProducer APIs and configure
> > > options do not change, I would vote to replace the implementation w/
> > > HTTP-based ElasticsearchSystemProducer.
> > >
> > > Thanks for putting this new additions up!
> > >
> > > -Yi
> > >
> > > On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover 
> > > wrote:
> > >
> > > > Hi Samza folks,
> > > >
> > > > For people who want to use HTTP to integrate with Elasticsearch, I
> > wrote
> > > an
> > > > HTTP-based system producer and a reusable task, including latency
> stats
> > > > from event origin time, task processing time, and time spent talking
> to
> > > > Elasticsearch API.
> > > >
> > > >
> > >
> >
> https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md
> > > >
> > > > Cheers,
> > > >
> > > > Roger
> > > >
> > >
> >
>


Re: HTTP-based Elasticsearch system producer and reusable task

2016-02-09 Thread Roger Hoover
Hi Yi,

It could be merged into the Samza project if there's enough interest but
may need some re-working depending on which dependencies are ok to bring
in.  I did it outside of the Samza project first because I had to get it
done quickly so it relies on Java 8 features, dropwizard metrics for
histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which
itself drags in more dependencies (Guava, Gson, commons http).

There are few issues with the existing ElasticsearchSystemProducer:

   1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch
   Java API (a bulky dependency)
   2. It only supports index requests.  I needed to also support updates
   and deletes.
   3. There currently no plugin mechanism to register a flush listener.
   The reason I needed that was to be able to report end to end latency stats
   (total pipeline latency = commit time - event time).

#3 is easily solvable with a additional plugin options. #1 and #2 require
changing the system producer API.

Roger

On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan  wrote:

> Hi, Roger,
>
> That's awesome! Are you planning to submit the HTTP-based system producer
> in Samza open-source samza-elasticsearch module? If ElasticSearch community
> suggest that HTTP-based clients be the recommended way, we should use it in
> samza-elasticsearch as well. And what's your opinion on the existing
> ElasticsearchSystemProducer? If the SystemProducer APIs and configure
> options do not change, I would vote to replace the implementation w/
> HTTP-based ElasticsearchSystemProducer.
>
> Thanks for putting this new additions up!
>
> -Yi
>
> On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover 
> wrote:
>
> > Hi Samza folks,
> >
> > For people who want to use HTTP to integrate with Elasticsearch, I wrote
> an
> > HTTP-based system producer and a reusable task, including latency stats
> > from event origin time, task processing time, and time spent talking to
> > Elasticsearch API.
> >
> >
> https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md
> >
> > Cheers,
> >
> > Roger
> >
>


HTTP-based Elasticsearch system producer and reusable task

2016-02-09 Thread Roger Hoover
Hi Samza folks,

For people who want to use HTTP to integrate with Elasticsearch, I wrote an
HTTP-based system producer and a reusable task, including latency stats
from event origin time, task processing time, and time spent talking to
Elasticsearch API.

https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md

Cheers,

Roger


Re: Samza 0.10 released

2016-01-04 Thread Roger Hoover
Excellent.  Thanks for all the work by everyone to get this done.  Congrats!

On Mon, Jan 4, 2016 at 11:04 AM, Yi Pan  wrote:

> Hi, all,
>
> In case you missed the announcement of Samza 0.10 release before the
> Christmas, please check it out here: https://blogs.apache.org/samza/ and
> help to spread out the word on twitter.
>
> And happy new year to everyone!
>
> Thanks!
>
> -Yi
>


Re: Executing Samza jobs natively in Kubernetes

2015-11-30 Thread Roger Hoover
Awesome.  Thanks.

On Sun, Nov 29, 2015 at 3:25 PM, Elias Levy 
wrote:

> Roger,
>
> You are welcomed.  If you want to experiment, you can use my hello samza
> <https://hub.docker.com/r/elevy/hello-samza/> Docker image.
>
> On Sun, Nov 29, 2015 at 12:19 PM, Roger Hoover 
> wrote:
>
> > Elias,
> >
> > I would also love to be able to deploy Samza on Kubernetes with dynamic
> > task management.  Thanks for sharing this.  It may be a good interim
> > solution.
> >
> > Roger
> >
> > On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy <
> fearsome.lucid...@gmail.com>
> > wrote:
> >
> > > I've been exploring Samza for stream processing as well as Kubernetes
> as
> > a
> > > container orchestration system and I wanted to be able to use one with
> > the
> > > other.  The prospect of having to execute YARN either along side or on
> > top
> > > of Kubernetes did not appeal to me, so I developed a KubernetesJob
> > > implementation of SamzaJob.
> > >
> > > You can find the details at
> > https://github.com/eliaslevy/samza_kubernetes,
> > > but in summary KubernetesJob executes and generates a serialized
> > JobModel.
> > > Instead of interacting with Kubernetes directly to create the
> > > SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with
> the
> > > YARN RM), it output a config YAML file that can be used to create the
> > > SamzaContainers in Kubernetes by using Resource Controllers.  For this
> > you
> > > require to package your job as a Docker image.  You can reach the
> README
> > at
> > > the above repo for details.
> > >
> > > A few observations:
> > >
> > > It would be useful if SamzaContainer accepted the JobModel via an
> > > environment variable.  Right not it expects a URL to download it
> from.  I
> > > get around this by using a entry point script that copies the model
> from
> > an
> > > environment variable into a file, then passes a file URL to
> > SamzaContainer.
> > >
> > > SamzaContainer doesn't allow you to configure the JMX port.  It
> selects a
> > > port at random from the ephemeral range as it expects to execute in
> YARN
> > > where a static port could result in a conflict.  This is not the case
> in
> > > Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP
> > > address.
> > >
> > > This implementation doesn't provide a Samza dashboard, which in the
> YARN
> > > implementation is hosted in the Application Master.  There didn't seem
> to
> > > be much value provided by the dashboard that is not already provided by
> > the
> > > Kubernetes tools for monitoring pods.
> > >
> > > I've successfully executed the hello-samza jobs in Kubernetes:
> > >
> > > $ kubectl get po
> > > NAME   READY STATUSRESTARTS   AGE
> > > kafka-1-jjh8n  1/1   Running   0  2d
> > > kafka-2-buycp  1/1   Running   0  2d
> > > kafka-3-tghkp  1/1   Running   0  2d
> > > wikipedia-feed-0-4its2 1/1   Running   0  1d
> > > wikipedia-parser-0-l0onv   1/1   Running   0  17h
> > > wikipedia-parser-1-crrxh   1/1   Running   0  17h
> > > wikipedia-parser-2-1c5nn   1/1   Running   0  17h
> > > wikipedia-stats-0-3gaiu1/1   Running   0  16h
> > > wikipedia-stats-1-j5qlk1/1   Running   0  16h
> > > wikipedia-stats-2-2laos1/1   Running   0  16h
> > > zookeeper-1-1sb4a  1/1   Running   0  2d
> > > zookeeper-2-dndk7  1/1   Running   0  2d
> > > zookeeper-3-46n09  1/1   Running   0  2d
> > >
> > >
> > > Finally, accessing services within the Kubernetes cluster from the
> > outside
> > > is quite cumbersome unless one uses an external load balancer.  This
> > makes
> > > it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper
> > and
> > > Kafka to find out the number of partitions on the topics it will
> > subscribe
> > > to, so it can assign them statically among the number of containers
> > > requested.
> > >
> > > Ideally Samza would operate along the lines of the Kafka high-level
> > > consumer, which dynamically coordinate to allocate work among members
> of
> > a
> > > consumer group.  This would do away with the new to execute SamzaJob a
> > > priori to generate the JobModel to pass to the SamzaContainers.  It
> would
> > > also allow for dynamically changing the number of containers without
> > having
> > > the shutdown the job.
> > >
> >
>


Re: Executing Samza jobs natively in Kubernetes

2015-11-29 Thread Roger Hoover
Elias,

I would also love to be able to deploy Samza on Kubernetes with dynamic
task management.  Thanks for sharing this.  It may be a good interim
solution.

Roger

On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy 
wrote:

> I've been exploring Samza for stream processing as well as Kubernetes as a
> container orchestration system and I wanted to be able to use one with the
> other.  The prospect of having to execute YARN either along side or on top
> of Kubernetes did not appeal to me, so I developed a KubernetesJob
> implementation of SamzaJob.
>
> You can find the details at https://github.com/eliaslevy/samza_kubernetes,
> but in summary KubernetesJob executes and generates a serialized JobModel.
> Instead of interacting with Kubernetes directly to create the
> SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with the
> YARN RM), it output a config YAML file that can be used to create the
> SamzaContainers in Kubernetes by using Resource Controllers.  For this you
> require to package your job as a Docker image.  You can reach the README at
> the above repo for details.
>
> A few observations:
>
> It would be useful if SamzaContainer accepted the JobModel via an
> environment variable.  Right not it expects a URL to download it from.  I
> get around this by using a entry point script that copies the model from an
> environment variable into a file, then passes a file URL to SamzaContainer.
>
> SamzaContainer doesn't allow you to configure the JMX port.  It selects a
> port at random from the ephemeral range as it expects to execute in YARN
> where a static port could result in a conflict.  This is not the case in
> Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP
> address.
>
> This implementation doesn't provide a Samza dashboard, which in the YARN
> implementation is hosted in the Application Master.  There didn't seem to
> be much value provided by the dashboard that is not already provided by the
> Kubernetes tools for monitoring pods.
>
> I've successfully executed the hello-samza jobs in Kubernetes:
>
> $ kubectl get po
> NAME   READY STATUSRESTARTS   AGE
> kafka-1-jjh8n  1/1   Running   0  2d
> kafka-2-buycp  1/1   Running   0  2d
> kafka-3-tghkp  1/1   Running   0  2d
> wikipedia-feed-0-4its2 1/1   Running   0  1d
> wikipedia-parser-0-l0onv   1/1   Running   0  17h
> wikipedia-parser-1-crrxh   1/1   Running   0  17h
> wikipedia-parser-2-1c5nn   1/1   Running   0  17h
> wikipedia-stats-0-3gaiu1/1   Running   0  16h
> wikipedia-stats-1-j5qlk1/1   Running   0  16h
> wikipedia-stats-2-2laos1/1   Running   0  16h
> zookeeper-1-1sb4a  1/1   Running   0  2d
> zookeeper-2-dndk7  1/1   Running   0  2d
> zookeeper-3-46n09  1/1   Running   0  2d
>
>
> Finally, accessing services within the Kubernetes cluster from the outside
> is quite cumbersome unless one uses an external load balancer.  This makes
> it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper and
> Kafka to find out the number of partitions on the topics it will subscribe
> to, so it can assign them statically among the number of containers
> requested.
>
> Ideally Samza would operate along the lines of the Kafka high-level
> consumer, which dynamically coordinate to allocate work among members of a
> consumer group.  This would do away with the new to execute SamzaJob a
> priori to generate the JobModel to pass to the SamzaContainers.  It would
> also allow for dynamically changing the number of containers without having
> the shutdown the job.
>


Re: Sample code or tutorial for writing/reading Avro type message in Samza

2015-11-17 Thread Roger Hoover
Hi Selina,

If you want to use Confluent's schema registry for Avro, then I have an
example in this repo:

https://github.com/theduderog/hello-samza-confluent

Cheers,

Roger

On Tue, Nov 17, 2015 at 12:32 AM, Selina Tech  wrote:

> Dear All:
>  Do you know where I can find the tutorial or sample code for writing
> Avro type message to Kafka and reading Avro type message from Kafka in
> Samza?
>   I am wondering how should I serialized GenericRecord to byte and
> deserialized it?
>  Your comments/suggestion are highly appreciated.
>
> Sincerely,
> Selina
>


Re: Checkpoint tool not working

2015-10-30 Thread Roger Hoover
I tried it once with 0.9.1 and it didn't work for me either.  I didn't have
time to examine it more carefully at the time.

Roger

On Thu, Oct 29, 2015 at 10:05 PM, Lukas Steiblys 
wrote:

> I'm using Samza 0.9.1.
>
> Lukas
>
> On 10/29/15, Yi Pan  wrote:
> > Hi, Lukas,
> >
> > Which version of checkpoint-tool are you using?
> >
> > -Yi
> >
> > On Thu, Oct 29, 2015 at 5:39 PM, Lukas Steiblys 
> > wrote:
> >
> >> Hello,
> >>
> >> I’m trying to write the checkpoints for a Samza task supplying these
> >> arguments to the checkpoint tool:
> >>
> >> bin/checkpoint-tool.sh
> >> --new-offsets=file:///checkpoints/client-metrics.properties
> >> --config-path=file:///checkpoints/task.properties
> >>
> >> However, it doesn’t actually write the checkpoints and, instead, prints
> >> out the current checkpoints.
> >>
> >> Is the tool currently broken? A sample of the client-metrics.properties
> >> file:
> >>
> >> tasknames.Partition 13.systems.kafka.streams.Sessions.partitions.13 =
> >> 723
> >> tasknames.Partition 14.systems.kafka.streams.Sessions.partitions.14 =
> >> 14589258
> >> tasknames.Partition
> >> 15.systems.kafka.streams.Sessions.partitions.15=10886881
> >>
> >> Lukas
> >
>


Snapshot metrics stop getting scheduled in an exception occurs

2015-10-26 Thread Roger Hoover
Hi Samza devs,

I ran into an issue with Samza 0.9.1 where I had a serialization exception
thrown in the MetricsSnapshotReporter.  It's very hard to find because
nothing is logged and the metrics just stop getting scheduled.  Samza
should catch all exceptions in that thread, log them, and suppress them
rather than no longer scheduling snapshot metrics at all.

JIRA filed here:
https://issues.apache.org/jira/browse/SAMZA-801

Cheers,

Roger


Re: Samza and KStreams (KIP-28): LinkedIn's POV

2015-10-05 Thread Roger Hoover
Great.  Thanks, Yi.

On Mon, Oct 5, 2015 at 10:25 AM, Yi Pan  wrote:

> Hi, Roger,
>
>
> On Sat, Oct 3, 2015 at 11:13 AM, Roger Hoover 
> wrote:
>
> > As previously discussed, the biggest request I
> > have is being able to run Samza without YARN, under something like
> > Kubernetes instead.
> >
> >
> Totally. We will be actively working on the standalone Samza after the
> upcoming 0.10 release.
>
>
> > Also, I'm curious.  What's the current state of the Samza SQL physical
> > operators?  Are they used in production yet?  Is there a doc on how to
> use
> > them?
> >
> >
> The current physical operators code now lives in samza-sql branch. There
> are still two big pending check-ins in-review right now, one to stabilize
> the operator's APIs, the other to implement the window operator. We are
> planning to finish the proto-type in Q4.
>
> Regards,
>
> -Yi
>


Re: Samza and KStreams (KIP-28): LinkedIn's POV

2015-10-03 Thread Roger Hoover
Hi Yi,

Thank you for sharing this update and perspective.  I tend to agree that
for simple, stateless cases, things could be easier and hopefully KStreams
may help with that.  I also appreciate a lot of features that Samza already
supports for operations.  As previously discussed, the biggest request I
have is being able to run Samza without YARN, under something like
Kubernetes instead.

Also, I'm curious.  What's the current state of the Samza SQL physical
operators?  Are they used in production yet?  Is there a doc on how to use
them?

Thanks,

Roger



On Fri, Oct 2, 2015 at 1:54 PM, Yi Pan  wrote:

> Hi, all Samza-lovers,
>
> This question on the relationship of Kafka KStream (KIP-28) and Samza has
> come up a couple times recently. So we wanted to clarify where we stand at
> LinkedIn in terms of this discussion.
>
> Samza has historically had a symbiotic relationship with Kafka and will
> continue to work very well with Kafka.  Earlier in the year, we had an
> in-depth discussion exploring an even deeper integration with Kafka.  After
> hitting multiple practical issues (e.g. Apache rules) and technical issues
> we had to give up on that idea.  As a fall out of the discussion, the Kafka
> community is adding some of the basic event processing capabilities into
> Kafka core directly. The basic callback/push style programming model by
> itself is a great addition to the Kafka API set.
>
> However at LinkedIn, we continue to be firmly committed to Samza as our
> stream processing framework. Although KStream is a nice addition to Kafka
> stack, our goals for Samza are broader. There are some key technical
> differences that makes Samza the right strategy for us.
>
> 1.  Support for non-kafka systems :
>
> At LinkedIn a larger percentage of our streaming jobs use Databus as an
> input source.   For any such non-Kafka source, although the CopyCat
> connector framework gives a common model for pulling data out of a source
> and pushing it into Kafka, it introduces yet another piece of
> infrastructure that we have to operate and manage.  Also for any companies
> who are already on AWS, Google Compute, Azure etc.  asking them to deploy
> and operate kafka in AWS instead of using the natively supported services
> like Kinesis, Google Cloud pub-sub, etc. etc. can potentially be a
> non-starter.  With more acquisitions at LinkedIn that use AWS we are also
> facing this first hand.  The Samza community has a healthy set of system
> producers/consumers which are in the works (Kinesis, ActiveMQ,
> ElasticSearch, HDFS, etc.).
>
> 2. We run Samza as a Stream Processing Service at LinkedIn. This is
> fundamentally different from KStream.
>
> This is similar to AWS Lambda and Google Cloud Dataflow, Azure Stream
> Insight and similar services.  The service makes it much easier for
> developers to get their stream processing jobs up and running in production
> by helping with the most common problems like monitoring, dashboards,
> auto-scale, rolling upgrades and such.
>
> The truth is that if the stream processing application is stateless then
> some of these common problems are not as involved and can be solved even on
> regular IaaS platforms like EC2 and such.   Arguably stateless applications
> can be built easily on top of the native APIs from the input source like
> kafka, kinesis etc.
>
> The place where Samza shines is with stateful stream processing
> applications.  When a Samza application uses the local rocks DB based
> state, the application needs special care in terms of rolling upgrades,
> addition/removal of containers/machines, temporary machine failures,
> capacity management.  We have already done some really good work in Samza
> 0.10 to make sure that we don't reseed the state from kafka (i.e.
> host-affinity feature that allows to reuse the local states).  In the
> absence of this feature, we had multiple production issues caused due to
> network saturation during state reseeding from kafka.   The problems with
> stateful applications are similar to problems encountered when building
> no-sql databases and other data systems.
>
> There are surely some scenarios where customers don't want any YARN
> dependency and want to run their stream processing application on a
> dedicated set of nodes.  This is where KStream clearly has an advantage
> over current Samza. Prior to KStream we had a patch in Samza which also
> solved the same problem (SAMZA-516). We do expect to finish this patch soon
> and formally support Stand Alone Samza.
>
> 3. Operators for Stream Processing and SQL :
>
> At LinkedIn, there is a growing need to iterate Samza jobs faster in the
> loop of implement, deploy, revise the code, and deploy again. A key
> bottleneck that slows down this iteration is the implementation of a Samza
> job. It has long-been recognized in the Samza community that there is a
> strong need for a high-level language support to shorten this iterative
> process. Since last year, we have identified SQL as the 

Re: New Samza blog published - http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node

2015-08-25 Thread Roger Hoover
Thanks for sharing!

Tao, did you use YARN to run 15 containers or is there a way to have them
statically divide up the tasks?



On Mon, Aug 24, 2015 at 12:03 PM, Ed Yakabosky <
eyakabo...@linkedin.com.invalid> wrote:

> Hi Samza open source,
>
> I want to share that Tao Feng
>  (from LinkedIn's
> Performance Team) has published a blog post
> <
> http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node
> >
> on
> Samza perf benchmarks in collaboration with our development team.  A lot of
> hard work went into this blog post so please join me in congratulating Tao
> and other contributors, and please share to your "Big Data" social media
> circles.
>
> Synopsis: *T**he objective of the blog is to measure Samza's performance in
> terms of the message-processing rate for a single machine for typical Samza
> use cases. T**his will help engineers to understand and to optimize
> performance and provide a basis for establishing a capacity model to run
> Samza platform as a service.*
>
> Thanks,
> Ed Yakabosky
> Streams Infrastructure TPM, LinkedIn
>


Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-24 Thread Roger Hoover
Jordan,

Thanks for sharing that.

Roger

On Mon, Aug 24, 2015 at 10:17 AM, Jordan Shaw  wrote:

> Roger,
> We upgraded from yarn 2.4 to 2.6 a while ago and been running it in prod
> with no issues. It was basically a drop in if I remember right.
>
> Jordan
>
> > On Aug 20, 2015, at 1:48 PM, Yi Pan  wrote:
> >
> > Hi, Selina,
> >
> > Samza 0.9.1 on YARN 2.6 is the proved working solution.
> >
> > Best,
> >
> > -Yi
> >
> >> On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech 
> wrote:
> >>
> >> Hi, Yi:
> >> If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed?
> >>
> >> Sincerely,
> >> Selina
> >>
> >>> On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan  wrote:
> >>>
> >>> Hi, Roger,
> >>>
> >>> In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
> >>> now. I am not aware of any major issues in upgrading. I will let our
> team
> >>> member Jon Bringhurst to chime in since he did all the upgrade and may
> >> have
> >>> more insights.
> >>>
> >>> @Jon, could you help to comment on this?
> >>>
> >>> Thanks!
> >>>
> >>> -Yi
> >>>
> >>> On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover 
> >>> wrote:
> >>>
> >>>> We're using 2.4.0 in production.  Are there any major
> incompatibilities
> >>> to
> >>>> watch out for when upgrading to 2.6.0?
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Roger
> >>>>
> >>>> On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang 
> >> wrote:
> >>>>
> >>>>> Hi guys,
> >>>>>
> >>>>> we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
> >>>>> <https://issues.apache.org/jira/browse/SAMZA-536>), because there
> >> are
> >>>> some
> >>>>> bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
> >>>> feature
> >>>>> in Yarn 2.4.0 (SAMZA-750 <
> >>>> https://issues.apache.org/jira/browse/SAMZA-750
> >>>>>> )
> >>>>> .
> >>>>>
> >>>>> So we just want to make sure if any production users are still using
> >>> Yarn
> >>>>> 2.4.0 and do not plan to upgrade to 2.6.0+?
> >>>>>
> >>>>> If not further concern, I think we can go and upgrade to Yarn 2.6.0
> >> in
> >>>>> Samza 0.10.0 release.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Fang, Yan
> >>>>> yanfang...@gmail.com
> >>
>


Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-24 Thread Roger Hoover
Works for me.

Sent from my iPhone

> On Aug 24, 2015, at 7:48 AM, Yan Fang  wrote:
> 
> Hi Roger,
> 
> If you have plan to upgrade to 2.6.0, and no other companies are using
> 2.4.0, I think we can upgrade to 2.6.0 yarn in 0.10.0.
> 
> Thanks,
> 
> Fang, Yan
> yanfang...@gmail.com
> 
>> On Thu, Aug 20, 2015 at 4:48 PM, Yi Pan  wrote:
>> 
>> Hi, Selina,
>> 
>> Samza 0.9.1 on YARN 2.6 is the proved working solution.
>> 
>> Best,
>> 
>> -Yi
>> 
>> On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech 
>> wrote:
>> 
>>> Hi, Yi:
>>> If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed?
>>> 
>>> Sincerely,
>>> Selina
>>> 
>>>> On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan  wrote:
>>>> 
>>>> Hi, Roger,
>>>> 
>>>> In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
>>>> now. I am not aware of any major issues in upgrading. I will let our
>> team
>>>> member Jon Bringhurst to chime in since he did all the upgrade and may
>>> have
>>>> more insights.
>>>> 
>>>> @Jon, could you help to comment on this?
>>>> 
>>>> Thanks!
>>>> 
>>>> -Yi
>>>> 
>>>> On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover 
>>>> wrote:
>>>> 
>>>>> We're using 2.4.0 in production.  Are there any major
>> incompatibilities
>>>> to
>>>>> watch out for when upgrading to 2.6.0?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Roger
>>>>> 
>>>>> On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang 
>>> wrote:
>>>>> 
>>>>>> Hi guys,
>>>>>> 
>>>>>> we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
>>>>>> <https://issues.apache.org/jira/browse/SAMZA-536>), because there
>>> are
>>>>> some
>>>>>> bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
>>>>> feature
>>>>>> in Yarn 2.4.0 (SAMZA-750 <
>>>>> https://issues.apache.org/jira/browse/SAMZA-750
>>>>>>> )
>>>>>> .
>>>>>> 
>>>>>> So we just want to make sure if any production users are still
>> using
>>>> Yarn
>>>>>> 2.4.0 and do not plan to upgrade to 2.6.0+?
>>>>>> 
>>>>>> If not further concern, I think we can go and upgrade to Yarn 2.6.0
>>> in
>>>>>> Samza 0.10.0 release.
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Fang, Yan
>>>>>> yanfang...@gmail.com
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-19 Thread Roger Hoover
Thanks, Yi!

On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan  wrote:

> Hi, Roger,
>
> In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
> now. I am not aware of any major issues in upgrading. I will let our team
> member Jon Bringhurst to chime in since he did all the upgrade and may have
> more insights.
>
> @Jon, could you help to comment on this?
>
> Thanks!
>
> -Yi
>
> On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover 
> wrote:
>
> > We're using 2.4.0 in production.  Are there any major incompatibilities
> to
> > watch out for when upgrading to 2.6.0?
> >
> > Thanks,
> >
> > Roger
> >
> > On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang  wrote:
> >
> > > Hi guys,
> > >
> > > we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
> > > <https://issues.apache.org/jira/browse/SAMZA-536>), because there are
> > some
> > > bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
> > feature
> > > in Yarn 2.4.0 (SAMZA-750 <
> > https://issues.apache.org/jira/browse/SAMZA-750
> > > >)
> > > .
> > >
> > > So we just want to make sure if any production users are still using
> Yarn
> > > 2.4.0 and do not plan to upgrade to 2.6.0+?
> > >
> > > If not further concern, I think we can go and upgrade to Yarn 2.6.0 in
> > > Samza 0.10.0 release.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> >
>


Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-19 Thread Roger Hoover
We're using 2.4.0 in production.  Are there any major incompatibilities to
watch out for when upgrading to 2.6.0?

Thanks,

Roger

On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang  wrote:

> Hi guys,
>
> we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
> ), because there are some
> bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature
> in Yarn 2.4.0 (SAMZA-750  >)
> .
>
> So we just want to make sure if any production users are still using Yarn
> 2.4.0 and do not plan to upgrade to 2.6.0+?
>
> If not further concern, I think we can go and upgrade to Yarn 2.6.0 in
> Samza 0.10.0 release.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>


Re: Use one producer for both coordinator stream and users system?

2015-08-18 Thread Roger Hoover
Hi Yan,

My (uneducated) guess is that the performance gains come from batching.  I
don't know if the new producer ever batches by destination broker.  If not
and it only batches by (broker,topic,partition) then I doubt that one vs
two producers will affect performance as they send to different topics.

Cheers,

Roger

On Tue, Aug 18, 2015 at 12:26 AM, Yan Fang  wrote:

> Hi Tao,
>
> First, one kafka producer has an i/o thread. (correct me if I am wrong).
>
> Second, after Samza 0.10.0, we have a coordinator stream, which stores the
> checkpoint, config and other locality information for auto-scaling, dynamic
> configuration, etc purpose. (See Samza-348
> ). So we have a producer
> for this coordinator stream.
>
> Therefore, each contains will have at least two producers, one is for the
> coordinator stream, one is for the users system.
>
> My question is, can we use only one producer for both coordinator stream
> and the users system to have better performance? (from the doc, it may
> retrieve better performance.)
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Mon, Aug 17, 2015 at 9:49 PM, Tao Feng  wrote:
>
> > Hi Yan,
> >
> > Naive question: what do we need producer thread of coordinator stream
> for?
> >
> > Thanks,
> > -Tao
> >
> > On Mon, Aug 17, 2015 at 2:09 PM, Yan Fang  wrote:
> >
> > > Hi guys,
> > >
> > > I have this question because Kafka's doc
> > > <
> > >
> >
> http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> > > >
> > > seems recommending having one producer shared by all threads ("*The
> > > producer is thread safe and should generally be shared among all
> threads
> > > for best performance.*"), while currently the coordinator stream is
> > using a
> > > separate producer (usually, there are two producers(two producer
> threads)
> > > in each container: one is for the coordinator stream , one is for the
> > > "real" job)
> > >
> > > 1. Will having one producer shared by all thread really improve the
> > > performance? (haven't done the perf test myself. Guess Kafka has some
> > > proof).
> > >
> > > 2. if yes, should we go this way?
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> >
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thanks, Yi!

On Wed, Jul 29, 2015 at 12:16 PM, Yi Pan  wrote:

> Hi, Roger,
>
> I am testing the patch now. Will update the JIRA soon.
>
> Thanks!
>
> -Yi
>
> On Wed, Jul 29, 2015 at 12:11 PM, Roger Hoover 
> wrote:
>
> > Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
> > committers please take a look?
> >
> > On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey 
> > wrote:
> >
> > >This is an automatically generated e-mail. To reply, visit:
> > > https://reviews.apache.org/r/36815/
> > >
> > > On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
> > >
> > >
> > >
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> > > <
> >
> https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116
> >
> > (Diff
> > > revision 6)
> > >
> > > public void register(final String source) {
> > >
> > >116
> > >
> > >   LOGGER.info(itemResp.getFailureMessage());
> > >
> > >   Should we add a Samza specifc message, then add the whole exception?
> > so it's more clear what the exception was from if the user doesn't know
> the
> > code? Logger.info("Failed to index message in ElasticSearch.", e);
> > >
> > > This would also be true for other log lines added.
> > >
> > >  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
> > >
> > > Good idea.  Thanks.
> > >
> > > BTW, it didn't work like this: Logger.info("Failed to index message in
> > ElasticSearch.", itemResp.getFailure()) so I did this:
> > >
> > > LOGGER.error("Failed to index document in Elasticsearch: " +
> > itemResp.getFailureMessage());
> > >
> > >  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
> > >
> > > This is what the messages look like
> > >
> > > 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index
> > document in Elasticsearch:
> > VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]:
> > version conflict, current [9], provided [5]]
> > >
> > >  That looks fine!
> > >
> > >
> > > - Dan
> > >
> > > On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
> > >   Review request for samza and Dan Harvey.
> > > By Roger Hoover.
> > >
> > > *Updated July 29, 2015, 6:24 p.m.*
> > >  *Repository: * samza
> > > Description
> > >
> > > SAMZA-741 Add support for versioning to Elasticsearch System Producer
> > >
> > >   Testing
> > >
> > > Refactored DefaultIndexRequestFactory to make it easier to subclass and
> > customize to handle version and version_type parameters.
> > >
> > >   Diffs
> > >
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> > >(f61bd36)
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
> > >(e3b635b)
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
> > >(afe0eee)
> > >-
> >
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
> > >(980964f)
> > >-
> >
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
> > >(684d7f6)
> > >
> > > View Diff <https://reviews.apache.org/r/36815/diff/>
> > >
> >
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
committers please take a look?

On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey  wrote:

>This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
>
> On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
>
>
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> <https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116> 
> (Diff
> revision 6)
>
> public void register(final String source) {
>
>116
>
>   LOGGER.info(itemResp.getFailureMessage());
>
>   Should we add a Samza specifc message, then add the whole exception? so 
> it's more clear what the exception was from if the user doesn't know the 
> code? Logger.info("Failed to index message in ElasticSearch.", e);
>
> This would also be true for other log lines added.
>
>  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
>
> Good idea.  Thanks.
>
> BTW, it didn't work like this: Logger.info("Failed to index message in 
> ElasticSearch.", itemResp.getFailure()) so I did this:
>
> LOGGER.error("Failed to index document in Elasticsearch: " + 
> itemResp.getFailureMessage());
>
>  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
>
> This is what the messages look like
>
> 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
> document in Elasticsearch: 
> VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
> version conflict, current [9], provided [5]]
>
>  That looks fine!
>
>
> - Dan
>
> On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
>   Review request for samza and Dan Harvey.
> By Roger Hoover.
>
> *Updated July 29, 2015, 6:24 p.m.*
>  *Repository: * samza
> Description
>
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
>
>   Testing
>
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
>
>   Diffs
>
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>(f61bd36)
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>(e3b635b)
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>(afe0eee)
>- 
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>(980964f)
>- 
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>(684d7f6)
>
> View Diff <https://reviews.apache.org/r/36815/diff/>
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


> On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 116
> > <https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116>
> >
> > Should we add a Samza specifc message, then add the whole exception? so 
> > it's more clear what the exception was from if the user doesn't know the 
> > code? `Logger.info("Failed to index message in ElasticSearch.", e);`
> > 
> > This would also be true for other log lines added.
> 
> Roger Hoover wrote:
> Good idea.  Thanks.
> 
> BTW, it didn't work like this: Logger.info("Failed to index message in 
> ElasticSearch.", itemResp.getFailure()) so I did this:
> 
> LOGGER.error("Failed to index document in Elasticsearch: " + 
> itemResp.getFailureMessage());

This is what the messages look like

```2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
document in Elasticsearch: 
VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
version conflict, current [9], provided [5]]```


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:24 p.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:24 p.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 6:24 p.m.)


Review request for samza and Dan Harvey.


Changes
---

More descriptive log messages


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


> On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 116
> > <https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116>
> >
> > Should we add a Samza specifc message, then add the whole exception? so 
> > it's more clear what the exception was from if the user doesn't know the 
> > code? `Logger.info("Failed to index message in ElasticSearch.", e);`
> > 
> > This would also be true for other log lines added.

Good idea.  Thanks.

BTW, it didn't work like this: Logger.info("Failed to index message in 
ElasticSearch.", itemResp.getFailure()) so I did this:

LOGGER.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:22 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: changelog compaction problem

2015-07-29 Thread Roger Hoover
You also may want to check if the cleaner thread in the broker is still
alive (using jstack).  I've run into this issue and used the fix mentioned
in the ticket to get compaction working again.

https://issues.apache.org/jira/browse/KAFKA-1641
I'd just like to mention that a possible workaround (depending on your
situation in regard to keys) is to stop the broker, remove the cleaner
offset checkpoint, and then start the broker again for each ISR member in
serial to get the thread running again. Keep in mind that the cleaner will
start from the beginning if you do this.

On Wed, Jul 29, 2015 at 8:43 AM, Chinmay Soman 
wrote:

> Just curious,
>
> Can you double check if you have log compaction enabled on your Kafka
> brokers ?
>
> On Wed, Jul 29, 2015 at 8:30 AM, Vladimir Lebedev  wrote:
>
> > Hello,
> >
> > I have a problem with changelog in one of my samza jobs grows
> indefinitely.
> >
> > The job is quite simple, it reads messages from the input kafka topic,
> and
> > either creates or updates a key in task-local samza store. Once in a
> minute
> > the window method kicks-in, it iterates over all keys in the store and
> > deletes some of them, selecting on the contents of their value.
> >
> > Message rate in input topic is about 3000 messages per second. The input
> > topic is partitioned in 48 partitions. Average number of keys, kept in
> the
> > store is more or less stable and do not exceed 1 keys per task.
> Average
> > size of values is 50 bytes. So I expected that sum of all segments' size
> in
> > kafka data directory for the job's changelog topic should not exceed
> > 1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB (after 6 days
> > running from scratch) and it is growing.
> >
> > I tried to change default segment size for changelog topic in kafka, and
> > it worked a bit - instead of 500Mbyte segments I have now 50Mbyte
> segments,
> > but it did not heal the indefinite data growth problem.
> >
> > Moreover, if I stop the job and start it again it cannot restart, it
> > breaks right after reading all records from changelog topic.
> >
> > Did somebody have similar problem? How it could be resolved?
> >
> > Best regards,
> > Vladimir
> >
> > --
> > Vladimir Lebedev
> > w...@fastmail.fm
> >
> >
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 6:22 a.m.)


Review request for samza and Dan Harvey.


Changes
---

Making the invalid response type an error level log message


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover


> On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 149
> > <https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149>
> >
> > Quick question: Is it guaranteed that there is no DeleteResponse here? 
> > It would be good to at least log a warn if we get an unexpected response 
> > here.
> 
> Roger Hoover wrote:
> It is guaranteed that you will not get a DeleteResponse back because the 
> producer currently only allows IndexRequests.  In the furture, if it supports 
> DeleteRequest then we should add a counter metric for deletes.
> 
> Yi Pan (Data Infrastructure) wrote:
> @Roger, thanks for the explanation. My point is: it would be good to make 
> the code detect and log unexpected response.

Oh, I see.  I just added a warning log to do that.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93395
-------


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:22 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 6:18 a.m.)


Review request for samza and Dan Harvey.


Changes
---

Added log warning for unexpected response types


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover


> On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 149
> > <https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149>
> >
> > Quick question: Is it guaranteed that there is no DeleteResponse here? 
> > It would be good to at least log a warn if we get an unexpected response 
> > here.

It is guaranteed that you will not get a DeleteResponse back because the 
producer currently only allows IndexRequests.  In the furture, if it supports 
DeleteRequest then we should add a counter metric for deletes.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93395
-------


On July 29, 2015, 5:17 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 5:17 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: [DISCUSS] Release 0.10.0

2015-07-28 Thread Roger Hoover
Thanks, Yi.

I propose that we also include SAMZA-741 for Elasticsearch versioning
support with the new ES producer.  I think it's very close to being merged.

Roger


On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan  wrote:

> Hi, all,
>
> I want to start the discussion on the release schedule for 0.10.0. There
> are a few important features that we plan to release in 0.10.0 and I want
> to start this thread s.t. we can agree on what to include in 0.10.0
> release.
>
> There are the following main features added in 0.10.0:
> - RocksDB TTL support
> - Add CoordinatorStream and disable CheckpointManager
> - Elasticsearch Producer
> - Host affinity
> And other 0.10.0 tickets:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
>
> I propose to cut a 0.10.0 release after we get the following issues
> resolved:
> - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator stream
> - SAMZA-617: YARN host affinity in Samza
>
> Thoughts?
>
> Thanks!
>
> -Yi
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 5:17 a.m.)


Review request for samza and Dan Harvey.


Changes
---

Updates based on Dan's feedback


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 115
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line115>
> >
> > could switch these around so you've got 
> > getStatus().equals(RestStatus.CONFLICT), fewer nots.

Ok. I reworked it to not have nots.  :)


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 117
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line117>
> >
> > I don't think the LOGGER check here is needed?

Yeah, not necessary here.


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 118
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line118>
> >
> > I think it's worth adding a metrics here too so we know how many 
> > conflicts occur. Then does the log message from elastic search fit in with 
> > the Sazma ones, and is easy to understand?

Added them in the updateSuccessMetrics() method


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 124
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line124>
> >
> > it is odd a method with the name updateSuccessMetrics returns the 
> > number of writes? could just leave the log line as it was?
> > 
> > or it could compute the different in the metrics before and after 
> > calling updateSuccessMetrics() here? might make more sense?

Leaving it as is seemed misleading since version conflicts are not successfully 
written.  Before/after comparison seems a little messy so I moved the log line 
to the updateSuccessMetrics() method


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java,
> >  line 92
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023402#file1023402line92>
> >
> > I know I set this line before but reading the elasticsearch docs they 
> > recommend using  `Requests.indexRequest(index).type(type)`

Sounds good.


> On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 139
> > <https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line139>
> >
> > This already gets logged and thrown in flush(), is this to see the 
> > exception sooner?

These are the individual exceptions per document.  They're not being logged in 
the flush() method.  Only batch-level errors are saved and logged in the flush. 
 I wasn't seeing any of the document level errors in the log.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93249
---


On July 28, 2015, 6:15 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 28, 2015, 6:15 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-27 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 28, 2015, 6:13 a.m.)


Review request for samza.


Changes
---

Ignoring version conflicts


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-25 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 25, 2015, 4:48 p.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-25 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

Review request for samza.


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Kafka broker error from samza producer

2015-07-23 Thread Roger Hoover
I forgot to mention that for me the error always happened after restarting a 
broker.

Sent from my iPhone

> On Jul 23, 2015, at 4:25 PM, Jordan Shaw  wrote:
> 
> Hey Roger,
> I restarted the producer and the error went away on the broker. If it comes
> back I'll switch over to lz4. Thanks for the reply.
> -Jordan
> 
> On Thu, Jul 23, 2015 at 9:32 AM, Roger Hoover 
> wrote:
> 
>> Hi Jordan,
>> 
>> I ran into a similiar issue when using snappy compression and the new
>> producer.   If you disable compression or switch to lz4 or gzip, does the
>> issue go away?
>> 
>> Cheers,
>> 
>> Roger
>> 
>>> On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw  wrote:
>>> 
>>> Hey Everyone,
>>> I'm getting an:
>>> "kafka.message.InvalidMessageException: Message found with corrupt size
>>> (0)"
>>> 
>>> in my kafka server.log here is the full stack trace:
>>> https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1.
>>> 
>>> It indicates that the error is caused by a bad produce call from the
>> samza
>>> producer any idea what could be causing this? Just about the only thing
>>> that I can find is maybe a issue with snappy or compression but I don't
>> see
>>> a snappy call in the traceback.
>>> 
>>> --
>>> Jordan Shaw
>>> Full Stack Software Engineer
>>> PubNub Inc
> 
> 
> 
> -- 
> Jordan Shaw
> Full Stack Software Engineer
> PubNub Inc
> 1045 17th St
> San Francisco, CA 94107


How to map document version to the Elasticsearch System Producer?

2015-07-23 Thread Roger Hoover
Hi Dan and Samza devs,

I have a use case for which I need to set an external version on
Elasticsearch documents.  Versioning (
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning)
lets you prevent duplicate messages from temporarily overwriting new
versions of a document with old ones.

Currently, the Elasticsearch system producer does not support setting
versions.  Since Kafka/Samza don't have support for key/value headers in
messages, I think the best approach is to embed metadata into the stream
name.

We can add a version and version_type as options to the stream name.  These
match up with Elasticsearch REST API (
https://www.elastic.co/blog/elasticsearch-versioning-support)

{index-name}/{type-name}?version={version-id}&version_type={version-type}

I've created a JIRA (https://issues.apache.org/jira/browse/SAMZA-741).  I'd
appreciate your feedback.

Thanks,

Roger


Re: Kafka broker error from samza producer

2015-07-23 Thread Roger Hoover
Hi Jordan,

I ran into a similiar issue when using snappy compression and the new
producer.   If you disable compression or switch to lz4 or gzip, does the
issue go away?

Cheers,

Roger

On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw  wrote:

> Hey Everyone,
> I'm getting an:
>  "kafka.message.InvalidMessageException: Message found with corrupt size
> (0)"
>
> in my kafka server.log here is the full stack trace:
> https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1.
>
> It indicates that the error is caused by a bad produce call from the samza
> producer any idea what could be causing this? Just about the only thing
> that I can find is maybe a issue with snappy or compression but I don't see
> a snappy call in the traceback.
>
> --
> Jordan Shaw
> Full Stack Software Engineer
> PubNub Inc
>


Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-22 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 22, 2015, 5 p.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 22, 2015, 4:07 a.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Roger Hoover


> On July 21, 2015, 5:42 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, 
> > lines 36-37
> > <https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36>
> >
> > though it works, prefer to use the "def" here, not only because it has 
> > leff overhead, but also keep all the methods consistent for better 
> > readability. What do you think?
> 
> Roger Hoover wrote:
> Sounds good.  I only baulked on it the first time because I'm not that 
> skilled with Scala type decarations yet. :)  I can make this work

I take it back.  It seems it [can't be 
done](http://www.scala-lang.org/old/node/5082)


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
-------


On July 21, 2015, 5:41 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 21, 2015, 5:41 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
> 8eac8ef 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  e63d62c 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Roger Hoover


> On July 21, 2015, 5:42 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java, line 23
> > <https://reviews.apache.org/r/36473/diff/3/?file=1017435#file1017435line23>
> >
> > of the class "the" extends -> of the class "that" extends

Thanks


> On July 21, 2015, 5:42 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java, line 29
> > <https://reviews.apache.org/r/36473/diff/3/?file=1017435#file1017435line29>
> >
> > can we also have a constructor with the default prefix ""?

Good idea


> On July 21, 2015, 5:42 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, 
> > lines 36-37
> > <https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36>
> >
> > though it works, prefer to use the "def" here, not only because it has 
> > leff overhead, but also keep all the methods consistent for better 
> > readability. What do you think?

Sounds good.  I only baulked on it the first time because I'm not that skilled 
with Scala type decarations yet. :)  I can make this work


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---


On July 21, 2015, 5:41 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 21, 2015, 5:41 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
> 8eac8ef 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  e63d62c 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-20 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 21, 2015, 5:41 a.m.)


Review request for samza.


Changes
---

Fixed tests and added base class for Java metrics


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 15, 2015, 4:45 a.m.)


Review request for samza.


Changes
---

Removed extra space


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


> On July 14, 2015, 9:44 p.m., Yan Fang wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
> >  line 24
> > <https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24>
> >
> > can this class extends MetricsHelper? This can simplifies a little.
> 
> Roger Hoover wrote:
> I don't see how it simplifies things because I have to implement all the 
> methods in the Scala trait.  I'm having trouble getting the newGauge 
> signatures to match.
> 
> ```
> public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
> public final Counter bulkSendSuccess;
> public final Counter inserts;
> public final Counter updates;
> private final MetricsRegistry registry;
> private final String group;
> private final String systemName;
> 
> public interface JFunction {
> R apply();
> }
> 
> public ElasticsearchSystemProducerMetrics(String systemName, 
> MetricsRegistry registry) {
> group = this.getClass().getName();
> this.registry = registry;
> this.systemName = systemName;
> 
> bulkSendSuccess = newCounter("bulk-send-success");
> inserts = newCounter("docs-inserted");
> updates = newCounter("docs-updated");
> }
> 
> @Override
> public Counter newCounter(String name) {
> return MetricsHelper$class.newCounter(this, name);
> }
> 
> @Override
> public  Gauge newGauge(String name, T value) {
> return MetricsHelper$class.newGauge(this, name, value);
> }
> 
> @Override
> public  Gauge newGauge(String name, JFunction value) {
> return null;
> }
> 
> @Override
> public Timer newTimer(String name) {
> return MetricsHelper$class.newTimer(this, name);
> }
> 
> @Override
> public String getPrefix() {
> return systemName + "-";
> }
> 
> @Override
> public MetricsRegistry registry() {
> return registry;
> }
> 
> @Override
> public String group() {
> return group;
> }
> }
> ```

We really only need counters for this class but have to figure out how to 
implement the Scala newGauge methods which are tricky.  Would appreciate help 
if you know how to do it.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


> On July 14, 2015, 9:44 p.m., Yan Fang wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 152
> > <https://reviews.apache.org/r/36473/diff/1/?file=1010787#file1010787line152>
> >
> > remove the space

Sure thing.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


> On July 14, 2015, 9:44 p.m., Yan Fang wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
> >  line 24
> > <https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24>
> >
> > can this class extends MetricsHelper? This can simplifies a little.

I don't see how it simplifies things because I have to implement all the 
methods in the Scala trait.  I'm having trouble getting the newGauge signatures 
to match.

```
public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
public final Counter bulkSendSuccess;
public final Counter inserts;
public final Counter updates;
private final MetricsRegistry registry;
private final String group;
private final String systemName;

public interface JFunction {
R apply();
}

public ElasticsearchSystemProducerMetrics(String systemName, 
MetricsRegistry registry) {
group = this.getClass().getName();
this.registry = registry;
this.systemName = systemName;

bulkSendSuccess = newCounter("bulk-send-success");
inserts = newCounter("docs-inserted");
updates = newCounter("docs-updated");
}

@Override
public Counter newCounter(String name) {
return MetricsHelper$class.newCounter(this, name);
}

@Override
public  Gauge newGauge(String name, T value) {
return MetricsHelper$class.newGauge(this, name, value);
}

@Override
public  Gauge newGauge(String name, JFunction value) {
return null;
}

@Override
public Timer newTimer(String name) {
return MetricsHelper$class.newTimer(this, name);
}

@Override
public String getPrefix() {
return systemName + "-";
}

@Override
public MetricsRegistry registry() {
return registry;
}

@Override
public String group() {
return group;
}
}
```


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
-------


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Metrics for Elasticsearch System Producer

2015-07-13 Thread Roger Hoover
Hi all,

I've started using the new Elasticsearch System Producer (many thanks,
Dan!) and decided to add some metrics to it.

The JIRA ticket and review request links are here:

https://issues.apache.org/jira/browse/SAMZA-733

https://reviews.apache.org/r/36473/

Cheers,

Roger


Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-13 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Roger Hoover
That's great.  Thanks, Jay.

On Fri, Jul 10, 2015 at 8:46 AM, Jay Kreps  wrote:

> Yeah totally agree. I think you have this issue even today, right? I.e. if
> you need to make a simple config change and you're running in YARN today
> you end up bouncing the job which then rebuilds state. I think the fix is
> exactly what you described which is to have a long timeout on partition
> movement for stateful jobs so that if a job is just getting bounced, and
> the cluster manager (or admin) is smart enough to restart it on the same
> host when possible, it can optimistically reuse any existing state it finds
> on disk (if it is valid).
>
> So in this model the charter of the CM is to place processes as stickily as
> possible and to restart or re-place failed processes. The charter of the
> partition management system is to control the assignment of work to these
> processes. The nice thing about this is that the work assignment, timeouts,
> behavior, configs, and code will all be the same across all cluster
> managers.
>
> So I think that prototype would actually give you exactly what you want
> today for any cluster manager (or manual placement + restart script) that
> was sticky in terms of host placement since there is already a configurable
> partition movement timeout and task-by-task state reuse with a check on
> state validity.
>
> -Jay
>
> On Fri, Jul 10, 2015 at 8:34 AM, Roger Hoover 
> wrote:
>
> > That would be great to let Kafka do as much heavy lifting as possible and
> > make it easier for other languages to implement Samza apis.
> >
> > One thing to watch out for is the interplay between Kafka's group
> > management and the external scheduler/process manager's fault tolerance.
> > If a container dies, the Kafka group membership protocol will try to
> assign
> > it's tasks to other containers while at the same time the process manager
> > is trying to relaunch the container.  Without some consideration for this
> > (like a configurable amount of time to wait before Kafka alters the group
> > membership), there may be thrashing going on which is especially bad for
> > containers with large amounts of local state.
> >
> > Someone else pointed this out already but I thought it might be worth
> > calling out again.
> >
> > Cheers,
> >
> > Roger
> >
> >
> > On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps  wrote:
> >
> > > Hey Roger,
> > >
> > > I couldn't agree more. We spent a bunch of time talking to people and
> > that
> > > is exactly the stuff we heard time and again. What makes it hard, of
> > > course, is that there is some tension between compatibility with what's
> > > there now and making things better for new users.
> > >
> > > I also strongly agree with the importance of multi-language support. We
> > are
> > > talking now about Java, but for application development use cases
> people
> > > want to work in whatever language they are using elsewhere. I think
> > moving
> > > to a model where Kafka itself does the group membership, lifecycle
> > control,
> > > and partition assignment has the advantage of putting all that complex
> > > stuff behind a clean api that the clients are already going to be
> > > implementing for their consumer, so the added functionality for stream
> > > processing beyond a consumer becomes very minor.
> > >
> > > -Jay
> > >
> > > On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover 
> > > wrote:
> > >
> > > > Metamorphosis...nice. :)
> > > >
> > > > This has been a great discussion.  As a user of Samza who's recently
> > > > integrated it into a relatively large organization, I just want to
> add
> > > > support to a few points already made.
> > > >
> > > > The biggest hurdles to adoption of Samza as it currently exists that
> > I've
> > > > experienced are:
> > > > 1) YARN - YARN is overly complex in many environments where Puppet
> > would
> > > do
> > > > just fine but it was the only mechanism to get fault tolerance.
> > > > 2) Configuration - I think I like the idea of configuring most of the
> > job
> > > > in code rather than config files.  In general, I think the goal
> should
> > be
> > > > to make it harder to make mistakes, especially of the kind where the
> > code
> > > > expects something and the config doesn't match.  The current config
> is
> > > > quite intri

Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Roger Hoover
That would be great to let Kafka do as much heavy lifting as possible and
make it easier for other languages to implement Samza apis.

One thing to watch out for is the interplay between Kafka's group
management and the external scheduler/process manager's fault tolerance.
If a container dies, the Kafka group membership protocol will try to assign
it's tasks to other containers while at the same time the process manager
is trying to relaunch the container.  Without some consideration for this
(like a configurable amount of time to wait before Kafka alters the group
membership), there may be thrashing going on which is especially bad for
containers with large amounts of local state.

Someone else pointed this out already but I thought it might be worth
calling out again.

Cheers,

Roger


On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps  wrote:

> Hey Roger,
>
> I couldn't agree more. We spent a bunch of time talking to people and that
> is exactly the stuff we heard time and again. What makes it hard, of
> course, is that there is some tension between compatibility with what's
> there now and making things better for new users.
>
> I also strongly agree with the importance of multi-language support. We are
> talking now about Java, but for application development use cases people
> want to work in whatever language they are using elsewhere. I think moving
> to a model where Kafka itself does the group membership, lifecycle control,
> and partition assignment has the advantage of putting all that complex
> stuff behind a clean api that the clients are already going to be
> implementing for their consumer, so the added functionality for stream
> processing beyond a consumer becomes very minor.
>
> -Jay
>
> On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover 
> wrote:
>
> > Metamorphosis...nice. :)
> >
> > This has been a great discussion.  As a user of Samza who's recently
> > integrated it into a relatively large organization, I just want to add
> > support to a few points already made.
> >
> > The biggest hurdles to adoption of Samza as it currently exists that I've
> > experienced are:
> > 1) YARN - YARN is overly complex in many environments where Puppet would
> do
> > just fine but it was the only mechanism to get fault tolerance.
> > 2) Configuration - I think I like the idea of configuring most of the job
> > in code rather than config files.  In general, I think the goal should be
> > to make it harder to make mistakes, especially of the kind where the code
> > expects something and the config doesn't match.  The current config is
> > quite intricate and error-prone.  For example, the application logic may
> > depend on bootstrapping a topic but rather than asserting that in the
> code,
> > you have to rely on getting the config right.  Likewise with serdes, the
> > Java representations produced by various serdes (JSON, Avro, etc.) are
> not
> > equivalent so you cannot just reconfigure a serde without changing the
> > code.   It would be nice for jobs to be able to assert what they expect
> > from their input topics in terms of partitioning.  This is getting a
> little
> > off topic but I was even thinking about creating a "Samza config linter"
> > that would sanity check a set of configs.  Especially in organizations
> > where config is managed by a different team than the application
> developer,
> > it's very hard to get avoid config mistakes.
> > 3) Java/Scala centric - for many teams (especially DevOps-type folks),
> the
> > pain of the Java toolchain (maven, slow builds, weak command line
> support,
> > configuration over convention) really inhibits productivity.  As more and
> > more high-quality clients become available for Kafka, I hope they'll
> follow
> > Samza's model.  Not sure how much it affects the proposals in this thread
> > but please consider other languages in the ecosystem as well.  From what
> > I've heard, Spark has more Python users than Java/Scala.
> > (FYI, we added a Jython wrapper for the Samza API
> >
> >
> https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza
> > and are working on a Yeoman generator
> > https://github.com/Quantiply/generator-rico for Jython/Samza projects to
> > alleviate some of the pain)
> >
> > I also want to underscore Jay's point about improving the user
> experience.
> > That's a very important factor for adoption.  I think the goal should be
> to
> > make Samza as easy to get started with as something like Logstash.
> > Logstash is vastly inferior in terms of capabilities to Samza but it's
&

Re: Samza job on YARN stuck Unassigned

2015-07-10 Thread Roger Hoover
Hi Krzysztof,

I haven't seen that error before.  It does sound like it could be a
connection issue.  Did you check that the YARN node has access
to hdfs:///user/samza/deploy/event-log-etl-nested-0.1.0-dist.tar.gz?

One way to set the AM and containers to debug is to include a log4j.xml
file in your tar.gz on the lib folder.  There special logic in the start
scripts (
https://github.com/apache/samza/blob/master/samza-shell/src/main/bash/run-class.sh#L40)
that checks for that path and doesn't work with log4j.properties, for
example.

Cheers,

Roger



On Fri, Jul 10, 2015 at 4:18 AM, Krzysztof Zarzycki 
wrote:

> Hi there Samza developers,
>
> I have a problem that I cannot overcome with deploying Samza task on YARN.
> When I submitted the task, ApplicationMasters get created (2 of them), job
> is visible, but in state UNASSIGNED. After some time the job FAILED.
>
> application information on resource manager panel is :
> State: FAILED
> FinalStatus: FAILED
> Elapsed: 25mins, 2sec
> Diagnostics: Application application_1424354741837_0380 failed 2 times due
> to ApplicationMaster for attempt appattempt_1424354741837_0380_02 timed
> out. Failing the application.
>
>
> When I look into the logs of ApplicationMaster I see no errors, no
> warnings, anything wrong: Please see the output of "yarn logs" comand
> attached.
>
> My guess would be that connection failed between some components
> (container to ApplicationMaster? NodeManager? ).  I suspect that when
> looking at jstack output in the AM:
>
> "main" #1 prio=5 os_prio=0 tid=0x7f9338015000 nid=0x6f2f waiting on
> condition [0x7f933de6e000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at
> org.apache.hadoop.util.ThreadUtil.sleepAtLeastIgnoreInterrupts(ThreadUtil.java:43)
>   at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:154)
>   at com.sun.proxy.$Proxy18.registerApplicationMaster(Unknown Source)
>   at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:196)
>   at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138)
>   at
> org.apache.samza.job.yarn.SamzaAppMasterLifecycle.onInit(SamzaAppMasterLifecycle.scala:39)
>   at
> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
>   at
> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at
> org.apache.samza.job.yarn.SamzaAppMaster$.run(SamzaAppMaster.scala:108)
>   at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:95)
>   at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>
>
> On the other hand I see in logs correct RM addresses:
> 15/07/10 12:17:30 INFO client.RMProxy: Connecting to ResourceManager at
> hdnn02.company.com/148.251.82.11:8030
> 15/07/10 12:17:31 INFO client.RMProxy: Connecting to ResourceManager at
> hdnn02.company.com/148.251.82.11:8050
> ...
> 2015-07-10 12:17:31,032 [main] INFO  o.apache.samza.job.yarn.ClientHelper
> - trying to connect to RM hdnn02.company.com:8050
> ...
> 2015-07-10 12:17:31,680 [main] INFO  o.a.s.job.yarn.SamzaAppMasterService
> - Webapp is started at (rpc http://78.46.56.88:43268/, tracking http://
>
>
> Does anyone knows what could be wrong here? I'll be grateful for any help,
> also in just debugging the case.
> I start with a simple question: do you know how to set log4j for AM &
> containers to DEBUG?
>
> Thank you!
> Krzysztof
>
>
>


Re: Thoughts and obesrvations on Samza

2015-07-07 Thread Roger Hoover
Metamorphosis...nice. :)

This has been a great discussion.  As a user of Samza who's recently
integrated it into a relatively large organization, I just want to add
support to a few points already made.

The biggest hurdles to adoption of Samza as it currently exists that I've
experienced are:
1) YARN - YARN is overly complex in many environments where Puppet would do
just fine but it was the only mechanism to get fault tolerance.
2) Configuration - I think I like the idea of configuring most of the job
in code rather than config files.  In general, I think the goal should be
to make it harder to make mistakes, especially of the kind where the code
expects something and the config doesn't match.  The current config is
quite intricate and error-prone.  For example, the application logic may
depend on bootstrapping a topic but rather than asserting that in the code,
you have to rely on getting the config right.  Likewise with serdes, the
Java representations produced by various serdes (JSON, Avro, etc.) are not
equivalent so you cannot just reconfigure a serde without changing the
code.   It would be nice for jobs to be able to assert what they expect
from their input topics in terms of partitioning.  This is getting a little
off topic but I was even thinking about creating a "Samza config linter"
that would sanity check a set of configs.  Especially in organizations
where config is managed by a different team than the application developer,
it's very hard to get avoid config mistakes.
3) Java/Scala centric - for many teams (especially DevOps-type folks), the
pain of the Java toolchain (maven, slow builds, weak command line support,
configuration over convention) really inhibits productivity.  As more and
more high-quality clients become available for Kafka, I hope they'll follow
Samza's model.  Not sure how much it affects the proposals in this thread
but please consider other languages in the ecosystem as well.  From what
I've heard, Spark has more Python users than Java/Scala.
(FYI, we added a Jython wrapper for the Samza API
https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza
and are working on a Yeoman generator
https://github.com/Quantiply/generator-rico for Jython/Samza projects to
alleviate some of the pain)

I also want to underscore Jay's point about improving the user experience.
That's a very important factor for adoption.  I think the goal should be to
make Samza as easy to get started with as something like Logstash.
Logstash is vastly inferior in terms of capabilities to Samza but it's easy
to get started and that makes a big difference.

Cheers,

Roger





On Tue, Jul 7, 2015 at 3:29 AM, Gianmarco De Francisci Morales <
g...@apache.org> wrote:

> Forgot to add. On the naming issues, Kafka Metamorphosis is a clear winner
> :)
>
> --
> Gianmarco
>
> On 7 July 2015 at 13:26, Gianmarco De Francisci Morales 
> wrote:
>
> > Hi,
> >
> > @Martin, thanks for you comments.
> > Maybe I'm missing some important point, but I think coupling the releases
> > is actually a *good* thing.
> > To make an example, would it be better if the MR and HDFS components of
> > Hadoop had different release schedules?
> >
> > Actually, keeping the discussion in a single place would make agreeing on
> > releases (and backwards compatibility) much easier, as everybody would be
> > responsible for the whole codebase.
> >
> > That said, I like the idea of absorbing samza-core as a sub-project, and
> > leave the fancy stuff separate.
> > It probably gives 90% of the benefits we have been discussing here.
> >
> > Cheers,
> >
> > --
> > Gianmarco
> >
> > On 7 July 2015 at 02:30, Jay Kreps  wrote:
> >
> >> Hey Martin,
> >>
> >> I agree coupling release schedules is a downside.
> >>
> >> Definitely we can try to solve some of the integration problems in
> >> Confluent Platform or in other distributions. But I think this ends up
> >> being really shallow. I guess I feel to really get a good user
> experience
> >> the two systems have to kind of feel like part of the same thing and you
> >> can't really add that in later--you can put both in the same
> downloadable
> >> tar file but it doesn't really give a very cohesive feeling. I agree
> that
> >> ultimately any of the project stuff is as much social and naming as
> >> anything else--theoretically two totally independent projects could work
> >> to
> >> tightly align. In practice this seems to be quite difficult though.
> >>
> >> For the frameworks--totally agree it would be good to maintain the
> >> framework support with the project. In some cases there may not be too
> >> much
> >> there since the integration gets lighter but I think whatever stubs you
> >> need should be included. So no I definitely wasn't trying to imply
> >> dropping
> >> support for these frameworks, just making the integration lighter by
> >> separating process management from partition management.
> >>
> >> You raise two good points we would have to figure out if we we

Re: How to configure log4j separately for the AM versus containers?

2015-06-23 Thread Roger Hoover
Ah, this seems to work.  I saw the YarnJob.scala was referencing __package
to launch to AM itself.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC
-Dlog4j.configuration=file://$(pwd)/__package/lib/log4j-am.xml

On Tue, Jun 23, 2015 at 12:40 PM, Roger Hoover 
wrote:

> Hi,
>
> I want the App Master to log at INFO level and the container to log at
> ERROR.  Is there a way to configure the AM to use a different log4j config
> file?
>
> I'm trying to setting yarn.am.opts but ran couldn't get it to work with
> system properties.
>
> yarn.am.opts=-Xmx768m -XX:+UseSerialGC
> -Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml
>
> give bad substitution error.
>
> This actually works it feels too ugly.
>
> yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd
> | cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print)
>
> What's the right way to do it?
>
> Thanks,
>
> Roger
>


How to configure log4j separately for the AM versus containers?

2015-06-23 Thread Roger Hoover
Hi,

I want the App Master to log at INFO level and the container to log at
ERROR.  Is there a way to configure the AM to use a different log4j config
file?

I'm trying to setting yarn.am.opts but ran couldn't get it to work with
system properties.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC
-Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml

give bad substitution error.

This actually works it feels too ugly.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd |
cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print)

What's the right way to do it?

Thanks,

Roger


Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-22 Thread Roger Hoover
Yan,

I tested to patch locally and it looks good.  Creating a patched release for 
myself to test in our environment.  Thanks, again.

Sent from my iPhone

> On Jun 22, 2015, at 10:59 AM, Yi Pan  wrote:
> 
> Hi, Yan,
> 
> Thanks a lot for the quick fix on the mentioned bugs. It seems the fix for
> SAMZA-720 is pretty localized and I am OK to push it into 0.9.1. I will be
> working on back porting those changes to 0.9.1 later today and fix all the
> release related issues.
> 
> Thanks!
> 
> -Yi
> 
> On Mon, Jun 22, 2015 at 10:30 AM, Roger Hoover 
> wrote:
> 
>> Yan,
>> 
>> You rock.  Thank you so much for the quick fix.  I'm working on building
>> and testing the patch.
>> 
>> Cheers,
>> 
>> Roger
>> 
>>> On Mon, Jun 22, 2015 at 1:09 AM, Yan Fang  wrote:
>>> 
>>> Hi guys,
>>> 
>>> 1. I have the difficulty in building the 0.9.1 branch. I think this is
>>> mainly related to SAMZA-721
>>> <https://issues.apache.org/jira/browse/SAMZA-721>.
>>> 
>>> 2. Also, https://issues.apache.org/jira/browse/SAMZA-712 seems bothering
>>> people as well.
>>> 
>>> 3. https://issues.apache.org/jira/browse/SAMZA-720 is a critical bug we
>>> need to fix. Have already attached a patch.
>>> 
>>> 4. There is no maven staging link.
>>> 
>>> Thanks,
>>> 
>>> Fang, Yan
>>> yanfang...@gmail.com
>>> 
>>> On Sun, Jun 21, 2015 at 1:53 PM, Roger Hoover 
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> Do you think we could get this bootstrapping bug fixed before 0.9.1
>>>> release?  It seems like a critical bug.
>>>> 
>>>> https://issues.apache.org/jira/browse/SAMZA-720
>>>> 
>>>> Thanks,
>>>> 
>>>> Roger
>>>> 
>>>> On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang 
>> wrote:
>>>> 
>>>>> Agree. I will test it this weekend.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Fang, Yan
>>>>> yanfang...@gmail.com
>>>>> 
>>>>>> On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang 
>>>>> wrote:
>>>>> 
>>>>>> Since we only get one vote so far, I think I have to extend the
>> vote
>>>>>> deadline. Let's set it to next Monday 6pm.
>>>>>> 
>>>>>> Please check the candidate and vote for your opinions.
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan 
>>> wrote:
>>>>>> 
>>>>>>> +1. Ran the Samza failure test suite and succeeded over night.
>>>>>>> 
>>>>>>> On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang <
>> wangg...@gmail.com
>>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hey all,
>>>>>>>> 
>>>>>>>> This is a call for a vote on a release of Apache Samza 0.9.1.
>>> This
>>>>> is a
>>>>>>>> bug-fix release against 0.9.0.
>>>>>>>> 
>>>>>>>> The release candidate can be downloaded from here:
>>>>>>>> 
>>>>>>>> http://people.apache.org/~guozhang/samza-0.9.1-rc0/
>>>>>>>> 
>>>>>>>> The release candidate is signed with pgp key 911402D8, which is
>>>>>>>> included in the repository's KEYS file:
>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
>>>>>>>> 
>>>>>>>> and can also be found on keyservers:
>>>>>>>> 
>>>>>>>> http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
>>>>>>>> 
>>>>>>>> The git tag is release-0.9.1-rc0 and signed with the same pgp
>>> key:
>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
>>>>>>>> 
>>>>>>>> Test binaries have been published to Maven's staging
>> repository,
>>>> and
>>>>>> are
>>>>>>>> available here:
>>>>>>>> 
>>>>>>>> 5 critical bugs were resolved for this release:
>> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
>>>>>>>> 
>>>>>>>> The vote will be open for 72 hours ( end in 6:00pm Saturday,
>>>>> 06/20/2015
>>>>>>> ).
>>>>>>>> Please download the release candidate, check the
>>> hashes/signature,
>>>>>> build
>>>>>>> it
>>>>>>>> and test it, and then please vote:
>>>>>>>> 
>>>>>>>> [ ] +1 approve
>>>>>>>> [ ] +0 no opinion
>>>>>>>> [ ] -1 disapprove (and reason why)
>>>>>>>> 
>>>>>>>> -- Guozhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>> 


Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-22 Thread Roger Hoover
Yan,

 You rock.  Thank you so much for the quick fix.  I'm working on building
and testing the patch.

Cheers,

Roger

On Mon, Jun 22, 2015 at 1:09 AM, Yan Fang  wrote:

> Hi guys,
>
> 1. I have the difficulty in building the 0.9.1 branch. I think this is
> mainly related to SAMZA-721
> <https://issues.apache.org/jira/browse/SAMZA-721>.
>
> 2. Also, https://issues.apache.org/jira/browse/SAMZA-712 seems bothering
> people as well.
>
> 3. https://issues.apache.org/jira/browse/SAMZA-720 is a critical bug we
> need to fix. Have already attached a patch.
>
> 4. There is no maven staging link.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Sun, Jun 21, 2015 at 1:53 PM, Roger Hoover 
> wrote:
>
> > Hi all,
> >
> > Do you think we could get this bootstrapping bug fixed before 0.9.1
> > release?  It seems like a critical bug.
> >
> > https://issues.apache.org/jira/browse/SAMZA-720
> >
> > Thanks,
> >
> > Roger
> >
> > On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang  wrote:
> >
> > > Agree. I will test it this weekend.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Since we only get one vote so far, I think I have to extend the vote
> > > > deadline. Let's set it to next Monday 6pm.
> > > >
> > > > Please check the candidate and vote for your opinions.
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan 
> wrote:
> > > >
> > > > > +1. Ran the Samza failure test suite and succeeded over night.
> > > > >
> > > > > On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hey all,
> > > > > >
> > > > > > This is a call for a vote on a release of Apache Samza 0.9.1.
> This
> > > is a
> > > > > > bug-fix release against 0.9.0.
> > > > > >
> > > > > > The release candidate can be downloaded from here:
> > > > > >
> > > > > > http://people.apache.org/~guozhang/samza-0.9.1-rc0/
> > > > > >
> > > > > > The release candidate is signed with pgp key 911402D8, which is
> > > > > > included in the repository's KEYS file:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
> > > > > >
> > > > > > and can also be found on keyservers:
> > > > > >
> > > > > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
> > > > > >
> > > > > > The git tag is release-0.9.1-rc0 and signed with the same pgp
> key:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
> > > > > >
> > > > > > Test binaries have been published to Maven's staging repository,
> > and
> > > > are
> > > > > > available here:
> > > > > >
> > > > > > 5 critical bugs were resolved for this release:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
> > > > > >
> > > > > > The vote will be open for 72 hours ( end in 6:00pm Saturday,
> > > 06/20/2015
> > > > > ).
> > > > > > Please download the release candidate, check the
> hashes/signature,
> > > > build
> > > > > it
> > > > > > and test it, and then please vote:
> > > > > >
> > > > > > [ ] +1 approve
> > > > > > [ ] +0 no opinion
> > > > > > [ ] -1 disapprove (and reason why)
> > > > > >
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
Thanks, Yan.  I'll give it a try.

Sent from my iPhone

> On Jun 21, 2015, at 10:02 PM, Yan Fang  wrote:
> 
> Hi Roger,
> 
> I will try to look at the issue tomorrow if my time allows.
> 
> First thing first:
> 
> The build has some unexpected results. A quick fix:
> 
> 1. apply https://issues.apache.org/jira/browse/SAMZA-712
> 2. add
> 
> sourceSets.main.scala.srcDir "src/main/java" sourceSets.main.java.srcDirs =
> []
> 
> at line 126 of build.gradle.
> 
> Sorry for the inconvenience.
> 
> Thanks,
> 
> Fang, Yan
> yanfang...@gmail.com
> 
> On Sun, Jun 21, 2015 at 3:55 PM, Roger Hoover 
> wrote:
> 
>> Was looking through the code a little and it looks like the
>> BootstrappingChooser could use the list of SSPs passed into it's register()
>> method to figure out which partitions it need to monitor.
>> 
>> I wanted to try to build Samza to play around with it but I'm getting error
>> trying to build off of both the 0.9.0 and 0.9.1 branches.
>> 
>> thedude:samza (0.9.1) $ ./gradlew clean build
>> 
>> To honour the JVM settings for this build a new JVM will be forked. Please
>> consider using the daemon:
>> http://gradle.org/docs/2.0/userguide/gradle_daemon.html.
>> 
>> :clean
>> 
>> :samza-api:clean
>> 
>> :samza-core_2.10:clean
>> 
>> :samza-kafka_2.10:clean UP-TO-DATE
>> 
>> :samza-kv-inmemory_2.10:clean UP-TO-DATE
>> 
>> :samza-kv-rocksdb_2.10:clean UP-TO-DATE
>> 
>> :samza-kv_2.10:clean UP-TO-DATE
>> 
>> :samza-log4j:clean UP-TO-DATE
>> 
>> :samza-shell:clean UP-TO-DATE
>> 
>> :samza-test_2.10:clean UP-TO-DATE
>> 
>> :samza-yarn_2.10:clean UP-TO-DATE
>> 
>> :assemble UP-TO-DATE
>> 
>> :rat
>> 
>> Rat report: build/rat/rat-report.html
>> 
>> :check
>> 
>> :build
>> 
>> :samza-api:compileJava
>> 
>> :samza-api:processResources UP-TO-DATE
>> 
>> :samza-api:classes
>> 
>> :samza-api:jar
>> 
>> :samza-api:javadoc
>> 
>> 
>> /Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
>> warning: no @param for ssp
>> 
>>  void setStartingOffset(SystemStreamPartition ssp, String offset);
>> 
>>   ^
>> 
>> 
>> /Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
>> warning: no @param for offset
>> 
>>  void setStartingOffset(SystemStreamPartition ssp, String offset);
>> 
>>   ^
>> 
>> 2 warnings
>> 
>> :samza-api:javadocJar
>> 
>> :samza-api:sourcesJar
>> 
>> :samza-api:signArchives SKIPPED
>> 
>> :samza-api:assemble
>> 
>> :samza-api:compileTestJava
>> 
>> :samza-api:processTestResources UP-TO-DATE
>> 
>> :samza-api:testClasses
>> 
>> :samza-api:test
>> 
>> :samza-api:check
>> 
>> :samza-api:build
>> 
>> :samza-core_2.10:compileJava
>> 
>> :samza-core_2.10:compileScala
>> 
>> [ant:scalac]
>> 
>> /Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:43:
>> error: object SamzaObjectMapper is not a member of package
>> org.apache.samza.serializers.model
>> 
>> [ant:scalac] import org.apache.samza.serializers.model.SamzaObjectMapper
>> 
>> [ant:scalac]^
>> 
>> [ant:scalac]
>> 
>> /Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:40:
>> error: object TaskModel is not a member of package
>> org.apache.samza.job.model
>> 
>> [ant:scalac] import org.apache.samza.job.model.TaskModel
>> 
>> [ant:scalac]^
>> 
>> ...
>> 
>> 
>> I've got JDK 8 installed.  Wondering that makes a difference or not.  I'd
>> appreciate any help.
>> 
>> Thanks,
>> 
>> Roger
>> 
>> 
>> 
>> On Sun, Jun 21, 2015 at 1:02 PM, Roger Hoover 
>> wrote:
>> 
>>> I think I see what's happening.
>>> 
>>> When there are 8 tasks and I set yarn.container.count=8, then each
>>> container is responsible for a single task.  However, the
>>> systemStreamLagCounts map (
>> https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77
>> )
>>> and laggingSystemStreamPartitions (
>> https://github.com/apache/samza/blob/0.9.0/samza-cor

Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
Was looking through the code a little and it looks like the
BootstrappingChooser could use the list of SSPs passed into it's register()
method to figure out which partitions it need to monitor.

I wanted to try to build Samza to play around with it but I'm getting error
trying to build off of both the 0.9.0 and 0.9.1 branches.

thedude:samza (0.9.1) $ ./gradlew clean build

To honour the JVM settings for this build a new JVM will be forked. Please
consider using the daemon:
http://gradle.org/docs/2.0/userguide/gradle_daemon.html.

:clean

:samza-api:clean

:samza-core_2.10:clean

:samza-kafka_2.10:clean UP-TO-DATE

:samza-kv-inmemory_2.10:clean UP-TO-DATE

:samza-kv-rocksdb_2.10:clean UP-TO-DATE

:samza-kv_2.10:clean UP-TO-DATE

:samza-log4j:clean UP-TO-DATE

:samza-shell:clean UP-TO-DATE

:samza-test_2.10:clean UP-TO-DATE

:samza-yarn_2.10:clean UP-TO-DATE

:assemble UP-TO-DATE

:rat

Rat report: build/rat/rat-report.html

:check

:build

:samza-api:compileJava

:samza-api:processResources UP-TO-DATE

:samza-api:classes

:samza-api:jar

:samza-api:javadoc

/Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
warning: no @param for ssp

  void setStartingOffset(SystemStreamPartition ssp, String offset);

   ^

/Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
warning: no @param for offset

  void setStartingOffset(SystemStreamPartition ssp, String offset);

   ^

2 warnings

:samza-api:javadocJar

:samza-api:sourcesJar

:samza-api:signArchives SKIPPED

:samza-api:assemble

:samza-api:compileTestJava

:samza-api:processTestResources UP-TO-DATE

:samza-api:testClasses

:samza-api:test

:samza-api:check

:samza-api:build

:samza-core_2.10:compileJava

:samza-core_2.10:compileScala

[ant:scalac]
/Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:43:
error: object SamzaObjectMapper is not a member of package
org.apache.samza.serializers.model

[ant:scalac] import org.apache.samza.serializers.model.SamzaObjectMapper

[ant:scalac]^

[ant:scalac]
/Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:40:
error: object TaskModel is not a member of package
org.apache.samza.job.model

[ant:scalac] import org.apache.samza.job.model.TaskModel

[ant:scalac]^

...


I've got JDK 8 installed.  Wondering that makes a difference or not.  I'd
appreciate any help.

Thanks,

Roger



On Sun, Jun 21, 2015 at 1:02 PM, Roger Hoover 
wrote:

> I think I see what's happening.
>
> When there are 8 tasks and I set yarn.container.count=8, then each
> container is responsible for a single task.  However, the
> systemStreamLagCounts map (
> https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77)
> and laggingSystemStreamPartitions (
> https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L83)
> are configured to track all partitions for the bootstrap topic rather than
> just the one partition assigned to this task.
>
> Later in the log, we see that the task/container completed bootstrap for
> it's own partition.
>
> 2015-06-21 12:28:55 org.apache.samza.system.chooser.BootstrappingChooser
> [DEBUG] Bootstrap stream partition is fully caught up:
> SystemStreamPartition [kafka, deploy.svc.tlrnsZOYQA6wrwAA4FLqZA, 0]
>
> but the Bootstrapping Chooser still thinks that the remaining partitions
> (assigned to other tasks in other containers) need to be completed.  JMX at
> this point shows 7 lagging partitions of the 8 original partition count.
>
> I'm wondering why no one has run into this.  Doesn't LinkedIn use
> partitioned bootstrapped topics?
>
> Thanks,
>
> Roger
>
> On Sun, Jun 21, 2015 at 12:22 PM, Roger Hoover 
> wrote:
>
>> Hi Yan,
>>
>> I've uploaded a file with TRACE level logging here:
>> http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz
>>
>> I really appreciate your help as this is a critical issue for me.
>>
>> Thanks,
>>
>> Roger
>>
>> On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang  wrote:
>>
>>> Hi Roger,
>>>
>>> " but it only spawns one container and still hangs after bootstrap"
>>> -- this probably is due to your local machine does not have enough
>>> resource for the second container. Because I checked your log file, each
>>> container is about 4GB.
>>>
>>> "When I run it on our YARN cluster with a single container, it works
>>> correctly.  When I tried it with 5 containers, it gets hung after
>>> consuming
>>> the bootstrap topic.&quo

Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-21 Thread Roger Hoover
Hi all,

Do you think we could get this bootstrapping bug fixed before 0.9.1
release?  It seems like a critical bug.

https://issues.apache.org/jira/browse/SAMZA-720

Thanks,

Roger

On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang  wrote:

> Agree. I will test it this weekend.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang  wrote:
>
> > Since we only get one vote so far, I think I have to extend the vote
> > deadline. Let's set it to next Monday 6pm.
> >
> > Please check the candidate and vote for your opinions.
> >
> > Guozhang
> >
> > On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan  wrote:
> >
> > > +1. Ran the Samza failure test suite and succeeded over night.
> > >
> > > On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hey all,
> > > >
> > > > This is a call for a vote on a release of Apache Samza 0.9.1. This
> is a
> > > > bug-fix release against 0.9.0.
> > > >
> > > > The release candidate can be downloaded from here:
> > > >
> > > > http://people.apache.org/~guozhang/samza-0.9.1-rc0/
> > > >
> > > > The release candidate is signed with pgp key 911402D8, which is
> > > > included in the repository's KEYS file:
> > > >
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
> > > >
> > > > and can also be found on keyservers:
> > > >
> > > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
> > > >
> > > > The git tag is release-0.9.1-rc0 and signed with the same pgp key:
> > > >
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
> > > >
> > > > Test binaries have been published to Maven's staging repository, and
> > are
> > > > available here:
> > > >
> > > > 5 critical bugs were resolved for this release:
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
> > > >
> > > > The vote will be open for 72 hours ( end in 6:00pm Saturday,
> 06/20/2015
> > > ).
> > > > Please download the release candidate, check the hashes/signature,
> > build
> > > it
> > > > and test it, and then please vote:
> > > >
> > > > [ ] +1 approve
> > > > [ ] +0 no opinion
> > > > [ ] -1 disapprove (and reason why)
> > > >
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
I think I see what's happening.

When there are 8 tasks and I set yarn.container.count=8, then each
container is responsible for a single task.  However, the
systemStreamLagCounts map (
https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77)
and laggingSystemStreamPartitions (
https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L83)
are configured to track all partitions for the bootstrap topic rather than
just the one partition assigned to this task.

Later in the log, we see that the task/container completed bootstrap for
it's own partition.

2015-06-21 12:28:55 org.apache.samza.system.chooser.BootstrappingChooser
[DEBUG] Bootstrap stream partition is fully caught up:
SystemStreamPartition [kafka, deploy.svc.tlrnsZOYQA6wrwAA4FLqZA, 0]

but the Bootstrapping Chooser still thinks that the remaining partitions
(assigned to other tasks in other containers) need to be completed.  JMX at
this point shows 7 lagging partitions of the 8 original partition count.

I'm wondering why no one has run into this.  Doesn't LinkedIn use
partitioned bootstrapped topics?

Thanks,

Roger

On Sun, Jun 21, 2015 at 12:22 PM, Roger Hoover 
wrote:

> Hi Yan,
>
> I've uploaded a file with TRACE level logging here:
> http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz
>
> I really appreciate your help as this is a critical issue for me.
>
> Thanks,
>
> Roger
>
> On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang  wrote:
>
>> Hi Roger,
>>
>> " but it only spawns one container and still hangs after bootstrap"
>> -- this probably is due to your local machine does not have enough
>> resource for the second container. Because I checked your log file, each
>> container is about 4GB.
>>
>> "When I run it on our YARN cluster with a single container, it works
>> correctly.  When I tried it with 5 containers, it gets hung after
>> consuming
>> the bootstrap topic."
>>-- Have you figure it out? I have a looked at your log and also the
>> code. My suspect is that, there is a null enveloper somehow blocking the
>> process. If you can paste the trace level log, it will be more helpful
>> because many logs in chooser are trace level.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>>
>> On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover 
>> wrote:
>>
>> > I need some help.  I have a job which bootstraps one stream and then is
>> > supposed to read from two.  When I run it on our YARN cluster with a
>> single
>> > container, it works correctly.  When I tried it with 5 containers, it
>> gets
>> > hung after consuming the bootstrap topic.  I ran it with the grid
>> script on
>> > my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
>> > container and still hangs after bootstrap.
>> >
>> > Debug logs are here: http://pastebin.com/af3KPvju
>> >
>> > I looked at JMX metrics and see:
>> > - Task Metrics - no value for kafka offset of non-bootstrapped stream
>> > -  SystemConsumerMetrics
>> > - choose null keeps incrementing
>> >  - ssps-needed-by-chooser 1
>> >   - unprocessed-messages 62k
>> > - Bootstrapping Chooser
>> >   - lagging partitions 4
>> >   - laggin-batch-streams - 4
>> >   - batch-resets - 0
>> >
>> > Has anyone seen this or can offer ideas of how to better debug it?
>> >
>> > I'm using Samza 0.9.0 and YARN 2.4.0.
>> >
>> > Thanks!
>> >
>> > Roger
>> >
>>
>
>


Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
Hi Yan,

I've uploaded a file with TRACE level logging here:
http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz

I really appreciate your help as this is a critical issue for me.

Thanks,

Roger

On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang  wrote:

> Hi Roger,
>
> " but it only spawns one container and still hangs after bootstrap"
> -- this probably is due to your local machine does not have enough
> resource for the second container. Because I checked your log file, each
> container is about 4GB.
>
> "When I run it on our YARN cluster with a single container, it works
> correctly.  When I tried it with 5 containers, it gets hung after consuming
> the bootstrap topic."
>-- Have you figure it out? I have a looked at your log and also the
> code. My suspect is that, there is a null enveloper somehow blocking the
> process. If you can paste the trace level log, it will be more helpful
> because many logs in chooser are trace level.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover 
> wrote:
>
> > I need some help.  I have a job which bootstraps one stream and then is
> > supposed to read from two.  When I run it on our YARN cluster with a
> single
> > container, it works correctly.  When I tried it with 5 containers, it
> gets
> > hung after consuming the bootstrap topic.  I ran it with the grid script
> on
> > my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
> > container and still hangs after bootstrap.
> >
> > Debug logs are here: http://pastebin.com/af3KPvju
> >
> > I looked at JMX metrics and see:
> > - Task Metrics - no value for kafka offset of non-bootstrapped stream
> > -  SystemConsumerMetrics
> > - choose null keeps incrementing
> >  - ssps-needed-by-chooser 1
> >   - unprocessed-messages 62k
> > - Bootstrapping Chooser
> >   - lagging partitions 4
> >   - laggin-batch-streams - 4
> >   - batch-resets - 0
> >
> > Has anyone seen this or can offer ideas of how to better debug it?
> >
> > I'm using Samza 0.9.0 and YARN 2.4.0.
> >
> > Thanks!
> >
> > Roger
> >
>


Re: Samza hung after bootstrapping

2015-06-20 Thread Roger Hoover
Thank you, Yan.  I'll get a trace level log as soon as I can.

Sent from my iPhone

> On Jun 19, 2015, at 12:05 PM, Yan Fang  wrote:
> 
> Hi Roger,
> 
> " but it only spawns one container and still hangs after bootstrap"
>-- this probably is due to your local machine does not have enough
> resource for the second container. Because I checked your log file, each
> container is about 4GB.
> 
> "When I run it on our YARN cluster with a single container, it works
> correctly.  When I tried it with 5 containers, it gets hung after consuming
> the bootstrap topic."
>   -- Have you figure it out? I have a looked at your log and also the
> code. My suspect is that, there is a null enveloper somehow blocking the
> process. If you can paste the trace level log, it will be more helpful
> because many logs in chooser are trace level.
> 
> Thanks,
> 
> Fang, Yan
> yanfang...@gmail.com
> 
> On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover 
> wrote:
> 
>> I need some help.  I have a job which bootstraps one stream and then is
>> supposed to read from two.  When I run it on our YARN cluster with a single
>> container, it works correctly.  When I tried it with 5 containers, it gets
>> hung after consuming the bootstrap topic.  I ran it with the grid script on
>> my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
>> container and still hangs after bootstrap.
>> 
>> Debug logs are here: http://pastebin.com/af3KPvju
>> 
>> I looked at JMX metrics and see:
>> - Task Metrics - no value for kafka offset of non-bootstrapped stream
>> -  SystemConsumerMetrics
>>- choose null keeps incrementing
>> - ssps-needed-by-chooser 1
>>  - unprocessed-messages 62k
>> - Bootstrapping Chooser
>>  - lagging partitions 4
>>  - laggin-batch-streams - 4
>>  - batch-resets - 0
>> 
>> Has anyone seen this or can offer ideas of how to better debug it?
>> 
>> I'm using Samza 0.9.0 and YARN 2.4.0.
>> 
>> Thanks!
>> 
>> Roger
>> 


Samza hung after bootstrapping

2015-06-18 Thread Roger Hoover
I need some help.  I have a job which bootstraps one stream and then is
supposed to read from two.  When I run it on our YARN cluster with a single
container, it works correctly.  When I tried it with 5 containers, it gets
hung after consuming the bootstrap topic.  I ran it with the grid script on
my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
container and still hangs after bootstrap.

Debug logs are here: http://pastebin.com/af3KPvju

I looked at JMX metrics and see:
- Task Metrics - no value for kafka offset of non-bootstrapped stream
-  SystemConsumerMetrics
- choose null keeps incrementing
 - ssps-needed-by-chooser 1
  - unprocessed-messages 62k
- Bootstrapping Chooser
  - lagging partitions 4
  - laggin-batch-streams - 4
  - batch-resets - 0

Has anyone seen this or can offer ideas of how to better debug it?

I'm using Samza 0.9.0 and YARN 2.4.0.

Thanks!

Roger


Re: Yarn scheduling

2015-05-30 Thread Roger Hoover
Garry,

Thanks for the reply.  My test wasn't very scientific.  I scheduled 16
tasks at once on a test cluster of two nodes and when I looked at the ones
running, one machine was maxed out (memory-wise) with 10 containers while
the other had only 3 and was adding the remaining 3 tasks.  It seemed the
scheduling algorithm was to fill one first then the other.

Thanks,

Roger


On Fri, May 29, 2015 at 4:42 PM, Garry Turkington <
g.turking...@improvedigital.com> wrote:

> Hi Roger,
>
> I've seen unbalanced container assignment across hosts but never one being
> maxed out before any others get any containers. So I'd look to the YARN
> config to start with.
>
> I believe though there will be a risk of this type of thing until  YARN
> implements anti-affinity:
>
> https://issues.apache.org/jira/browse/YARN-1042
>
> But as said above, maxing out a host at a time does sound odd to me.
>
> Garry
>
> -Original Message-
> From: Roger Hoover [mailto:roger.hoo...@gmail.com]
> Sent: 29 May 2015 23:09
> To: dev@samza.apache.org
> Subject: Yarn scheduling
>
> Hi,
>
> I notice that when YARN schedules my jobs, it loads up one machine
> completely before scheduling on the next.  I'm using Capacity Scheduler
> with a default config.
>
> Is there a way to make it "round-robin" among the available machines?
>
> Thanks,
>
> Roger
>
> -
> No virus found in this message.
> Checked by AVG - www.avg.com
> Version: 2014.0.4800 / Virus Database: 4311/9897 - Release Date: 05/29/15
>


Yarn scheduling

2015-05-29 Thread Roger Hoover
Hi,

I notice that when YARN schedules my jobs, it loads up one machine
completely before scheduling on the next.  I'm using Capacity Scheduler
with a default config.

Is there a way to make it "round-robin" among the available machines?

Thanks,

Roger


Re: Samza job throughput much lower than Kafka throughput

2015-05-21 Thread Roger Hoover
Oops.  Sent too soon.  I mean:

producer.batch.size=262144
producer.linger.ms=5
producer.compression.type=lz4


On Thu, May 21, 2015 at 9:00 AM, Roger Hoover 
wrote:

> Hi George,
>
> You might also try tweaking the producer settings.
>
> producer.batch.size=262144
> producer.linger.ms=5
> producer.compression.type: lz4
>
> On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang  wrote:
>
>> Hi George,
>>
>> Is there any reason you need to set the following configs?
>>
>> systems.kafka.consumer.fetch.wait.max.ms= 1
>>
>> This setting will basically disable long pooling of the consumer which
>> will
>> then busy fetching data from broker, which has a large impact on network
>> latency especially when the consumer is already caught up with the Kafka
>> broker.
>>
>> Also when you say it is "slower than a program reading directly from
>> Kafka." which consumer did your program use to read data from Kafka?
>>
>> Guozhang
>>
>>
>> On Wed, May 20, 2015 at 5:01 PM, George Li  wrote:
>>
>> > Hi Yi,
>> >
>> > Thanks for the reply. Below is my job config and code.
>> >
>> > When we run this job inside our dev docker container, which has
>> zookeeper,
>> > broker, and yarn installed locally,  its throughput is at least 50%
>> higher
>> > than our cluster run's.
>> >
>> > Thanks,
>> >
>> > George
>> >
>> > Configuration:
>> >
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=container-performance
>> >
>> > # YARN
>> > yarn.container.count=1
>> > yarn.container.memory.mb=2548
>> > yarn.package.path={my package on hdfs}
>> > yarn.container.retry.count=0
>> > yarn.am.container.memory.mb=2048
>> > yarn.am.jmx.enabled=false
>> >
>> > # Task
>> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
>> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
>> > -XX:+DisableExplicitGC -Djava.awt.headless=true
>> >
>> > task.class=samza.TestPerformanceTask
>> > task.inputs=kafka.throughput-test2
>> > task.log.interval=100
>> > task.checkpoint.factory =
>> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> > task.checkpoint.system=kafka
>> > task.checkpoint.replication.factor=1
>> >
>> > # Kafka System (only used for coordinator stream in this test)
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> > systems.kafka.samza.fetch.threshold=5
>> >
>> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
>> > systems.kafka.producer.bootstrap.servers={broker node}
>> > systems.kafka.consumer.auto.offset.reset=smallest
>> > systems.kafka.consumer.socket.receive.buffer.bytes= 220
>> > systems.kafka.consumer.fetch.message.max.bytes= 110
>> > systems.kafka.consumer.fetch.min.bytes= 1
>> > systems.kafka.consumer.fetch.wait.max.ms= 1
>> >
>> > #define coordinator system
>> > job.coordinator.system=kafka
>> > job.coordinator.replication.factor=1
>> >
>> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
>> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
>> > ~
>> >
>> > Job's code. This is mostly a copy-paste of the one in the repository
>> >
>> > object TestPerformanceTask {
>> >   // No thread safety is needed for these variables because they're
>> > mutated in
>> >   //   // the process method, which is single threaded.
>> >   var messagesProcessed = 0
>> >   var startTime = 0L
>> > }
>> >
>> > class TestPerformanceTask extends StreamTask with InitableTask with
>> > Logging {
>> >   import TestPerformanceTask._
>> >
>> >   /**
>> >** How many messages to process before a log message is printed.
>> >*   */
>> >   var logInterval = 1
>> >
>> >   /**
>> >** How many messages to process before shutting down.
>> >*   */
>> >   var maxMessages = 1000
>> >
>> >
>> >   var outputSystemStream: Option[SystemStream] = None
>> >
>> >   def init(config: Config, context: TaskContext) {
>> > logInterval = config.getInt("task.log.interval&qu

Re: Samza job throughput much lower than Kafka throughput

2015-05-21 Thread Roger Hoover
Hi George,

You might also try tweaking the producer settings.

producer.batch.size=262144
producer.linger.ms=5
producer.compression.type: lz4

On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang  wrote:

> Hi George,
>
> Is there any reason you need to set the following configs?
>
> systems.kafka.consumer.fetch.wait.max.ms= 1
>
> This setting will basically disable long pooling of the consumer which will
> then busy fetching data from broker, which has a large impact on network
> latency especially when the consumer is already caught up with the Kafka
> broker.
>
> Also when you say it is "slower than a program reading directly from
> Kafka." which consumer did your program use to read data from Kafka?
>
> Guozhang
>
>
> On Wed, May 20, 2015 at 5:01 PM, George Li  wrote:
>
> > Hi Yi,
> >
> > Thanks for the reply. Below is my job config and code.
> >
> > When we run this job inside our dev docker container, which has
> zookeeper,
> > broker, and yarn installed locally,  its throughput is at least 50%
> higher
> > than our cluster run's.
> >
> > Thanks,
> >
> > George
> >
> > Configuration:
> >
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=container-performance
> >
> > # YARN
> > yarn.container.count=1
> > yarn.container.memory.mb=2548
> > yarn.package.path={my package on hdfs}
> > yarn.container.retry.count=0
> > yarn.am.container.memory.mb=2048
> > yarn.am.jmx.enabled=false
> >
> > # Task
> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> > -XX:+DisableExplicitGC -Djava.awt.headless=true
> >
> > task.class=samza.TestPerformanceTask
> > task.inputs=kafka.throughput-test2
> > task.log.interval=100
> > task.checkpoint.factory =
> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > task.checkpoint.system=kafka
> > task.checkpoint.replication.factor=1
> >
> > # Kafka System (only used for coordinator stream in this test)
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.fetch.threshold=5
> >
> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
> > systems.kafka.producer.bootstrap.servers={broker node}
> > systems.kafka.consumer.auto.offset.reset=smallest
> > systems.kafka.consumer.socket.receive.buffer.bytes= 220
> > systems.kafka.consumer.fetch.message.max.bytes= 110
> > systems.kafka.consumer.fetch.min.bytes= 1
> > systems.kafka.consumer.fetch.wait.max.ms= 1
> >
> > #define coordinator system
> > job.coordinator.system=kafka
> > job.coordinator.replication.factor=1
> >
> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
> > ~
> >
> > Job's code. This is mostly a copy-paste of the one in the repository
> >
> > object TestPerformanceTask {
> >   // No thread safety is needed for these variables because they're
> > mutated in
> >   //   // the process method, which is single threaded.
> >   var messagesProcessed = 0
> >   var startTime = 0L
> > }
> >
> > class TestPerformanceTask extends StreamTask with InitableTask with
> > Logging {
> >   import TestPerformanceTask._
> >
> >   /**
> >** How many messages to process before a log message is printed.
> >*   */
> >   var logInterval = 1
> >
> >   /**
> >** How many messages to process before shutting down.
> >*   */
> >   var maxMessages = 1000
> >
> >
> >   var outputSystemStream: Option[SystemStream] = None
> >
> >   def init(config: Config, context: TaskContext) {
> > logInterval = config.getInt("task.log.interval", 1)
> > maxMessages = config.getInt("task.max.messages", 1000)
> > outputSystemStream = Option(config.get("task.outputs",
> > null)).map(Util.getSystemStreamFromNames(_))
> > println("init!!")
> >   }
> >
> >   def process(envelope: IncomingMessageEnvelope, collector:
> > MessageCollector, coordinator: TaskCoordinator) {
> > if (startTime == 0) {
> >   startTime = System.currentTimeMillis
> > }
> >
> > if (outputSystemStream.isDefined) {
> >   collector.send(new OutgoingMessageEnvelope(outputSystemStream.get,
> > envelope.getKey, envelope.getMessage))
> > }
> >
> > messagesProcessed += 1
> >
> > if (messagesProcessed % logInterval == 0) {
> >   val seconds = (System.currentTimeMillis - startTime) / 1000
> >   println("Processed %s messages in %s seconds." format
> > (messagesProcessed, seconds))
> > }
> >
> >
> > if (messagesProcessed >= maxMessages) {
> >   coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
> > }
> >   }
> > }
> >
> >
> >
> > From:   Yi Pan 
> > To: dev@samza.apache.org,
> > Date:   20/05/2015 05:03 PM
> > Subject:Re: Samza job throughput much lower than Kafka throughput
> >
> >
> >
> > Hi, George,
> >
> > Could you share w/ us the code and configuration of your sample test job?
> > Than

Re: Errors and hung job on broker shutdown

2015-04-29 Thread Roger Hoover
Guozhang and Yan,

Thank you both for your responses.  I tried a lot of combinations and I
think I've determined that it's new producer + snappy that causes the issue.

It never happens with the old producer and it never happens with lz4 or no
compression.  It only happens when a broker gets restarted (or maybe just
shutdown).

The error is not always the same.  I've noticed at least three types of
errors on the Kafka brokers.

1) java.io.IOException: failed to read chunk
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356)
http://pastebin.com/NZrrEHxU
2) java.lang.OutOfMemoryError: Java heap space
   at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346)
http://pastebin.com/yuxk1BjY
3) java.io.IOException: PARSING_ERROR(2)
  at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
http://pastebin.com/yq98Hx49

I've noticed a couple different behaviors from the Samza producer/job
A) It goes into a long retry loop where this message is logged.  I saw this
with error #1 above.

2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[253] Got
error produce response with correlation id 4878 on topic-partition
svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts
left). Error: CORRUPT_MESSAGE

B) The job exists with
org.apache.kafka.common.errors.UnknownServerException (at least when run as
ThreadJob).  I saw this with error #3 above.

org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 6 to system kafka.
org.apache.kafka.common.errors.UnknownServerException: The server
experienced an unexpected error when processing the request

This seems most likely to be a bug in the new Kafka producer.  I'll
probably file a JIRA for that project.

Thanks,

Roger

On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang  wrote:

> And just to answer your first question: SIGTERM with
> controlled.shutdown=true should be OK for bouncing the broker.
>
> Guozhang
>
> On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang  wrote:
>
> > Roger,
> >
> > I believe Samza 0.9.0 already uses the Java producer.
> >
> > Java producer's close() call will try to flush all buffered data to the
> > brokers before completing the call. However, if some buffered data's
> > destination partition leader is not known, the producer will block on
> > refreshing the metadata and then retry sending.
> >
> > From the broker logs, it seems it does receive the producer request but
> > failed to handle it due to "Leader not local" after the bounce:
> >
> > 
> > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> > correlation id 226 from client
> > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
> > [sys.samza_metrics,0] failed due to Leader not local for partition
> > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> > correlation id 45671 from client
> > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
> > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed
> > due to Leader not local for partition
> > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
> > (kafka.server.KafkaApis)
> > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> > correlation id 12267 from client
> > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
> > [sys.samza_metrics,0] failed due to Leader not local for partition
> > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > 
> >
> > because for these two topic-partitions (sys.samza_metrics,0 and
> > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has
> been
> > moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets
> the
> > error code from the old leader, it should refresh its metadata and get
> the
> > new leader as broker-1, and retry sending, but for some reason it does
> not
> > refresh its metadata. Without producer logs from Samza container I cannot
> > further investigate the issue.
> >
> > Which Kafka version does Samza 0.9.0 use?
> >
> > Guozhang
> >
> > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang  wrote:
> >
> >> Not sure about the Kafka side. From the Samza side, from your
> >> description ( "does
> >> not exit nor does it make any progress" ), I think the code is stuck in
> >> producer.close
> >> <
> >>
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apach

Re: Errors and hung job on broker shutdown

2015-04-28 Thread Roger Hoover
At error level logging, this was the only entry in the Samza log:

2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395]
Unable to send message from TaskName-Partition 1 to system kafka

Here is the log from the Kafka broker that was shutdown.

http://pastebin.com/afgmLyNF

Thanks,

Roger


On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan  wrote:

> Roger, could you paste the full log from Samza container? If you can figure
> out which Kafka broker the message was sent to, it would be helpful if we
> get the log from the broker as well.
>
> On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover 
> wrote:
>
> > Hi,
> >
> > I need some help figuring out what's going on.
> >
> > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics have
> > replication factor of 2.
> >
> > I'm bouncing the Kafka broker using SIGTERM (with
> > controlled.shutdown.enable=true).  I see the Samza job log this message
> and
> > then hang (does not exit nor does it make any progress).
> >
> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> > ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
> > TaskName-Partition 1 to system kafka
> >
> > The Kafka consumer (Druid Real-Time node) on the other side then barfs on
> > the message:
> >
> > Exception in thread "chief-svc-perf"
> kafka.message.InvalidMessageException:
> > Message is corrupt (stored crc = 1792882425, computed crc = 3898271689)
> > at kafka.message.Message.ensureValid(Message.scala:166)
> > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
> > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> > at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > at
> >
> >
> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
> > at
> >
> >
> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
> >
> > My questions are:
> > 1) What is the right way to bounce a Kafka broker?
> > 2) Is this a bug in Samza that the job hangs after producer request
> fails?
> > Has anyone seen this?
> >
> > Thanks,
> >
> > Roger
> >
>


Errors and hung job on broker shutdown

2015-04-28 Thread Roger Hoover
Hi,

I need some help figuring out what's going on.

I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics have
replication factor of 2.

I'm bouncing the Kafka broker using SIGTERM (with
controlled.shutdown.enable=true).  I see the Samza job log this message and
then hang (does not exit nor does it make any progress).

2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
TaskName-Partition 1 to system kafka

The Kafka consumer (Druid Real-Time node) on the other side then barfs on
the message:

Exception in thread "chief-svc-perf" kafka.message.InvalidMessageException:
Message is corrupt (stored crc = 1792882425, computed crc = 3898271689)
at kafka.message.Message.ensureValid(Message.scala:166)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at
io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
at
io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)

My questions are:
1) What is the right way to bounce a Kafka broker?
2) Is this a bug in Samza that the job hangs after producer request fails?
Has anyone seen this?

Thanks,

Roger


Re: How to configure the Resource Manager endpoint for YARN?

2015-04-15 Thread Roger Hoover
Turns out that HADOOP_CONF_DIR is the right env var (YARN_CONF_DIR did not 
work).  I had just messed up the directory path.  Doh!

Sent from my iPhone

> On Apr 15, 2015, at 9:41 AM, Roger Hoover  wrote:
> 
> I'll try that.  Thanks, Chris.
> 
>> On Wed, Apr 15, 2015 at 9:37 AM, Chris Riccomini  
>> wrote:
>> Hey Roger,
>> 
>> Not sure if this makes a difference, but have you tried using:
>> 
>>   export YARN_CONF_DIR=...
>> 
>> Instead? This is what we use.
>> 
>> Cheers,
>> Chris
>> 
>> On Wed, Apr 15, 2015 at 9:33 AM, Roger Hoover 
>> wrote:
>> 
>> > Hi,
>> >
>> > I'm trying to deploy a job to a small YARN cluster.  How do tell the
>> > launcher script where to find the Resource Manager?  I tried creating a
>> > yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it
>> > doesn't find my config.
>> >
>> > 2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM
>> > 0.0.0.0:8032
>> > 2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at /
>> > 0.0.0.0:8032
>> >
>> > Thanks,
>> >
>> > Roger
>> >
> 


Re: How to configure the Resource Manager endpoint for YARN?

2015-04-15 Thread Roger Hoover
I'll try that.  Thanks, Chris.

On Wed, Apr 15, 2015 at 9:37 AM, Chris Riccomini 
wrote:

> Hey Roger,
>
> Not sure if this makes a difference, but have you tried using:
>
>   export YARN_CONF_DIR=...
>
> Instead? This is what we use.
>
> Cheers,
> Chris
>
> On Wed, Apr 15, 2015 at 9:33 AM, Roger Hoover 
> wrote:
>
> > Hi,
> >
> > I'm trying to deploy a job to a small YARN cluster.  How do tell the
> > launcher script where to find the Resource Manager?  I tried creating a
> > yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it
> > doesn't find my config.
> >
> > 2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM
> > 0.0.0.0:8032
> > 2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at /
> > 0.0.0.0:8032
> >
> > Thanks,
> >
> > Roger
> >
>


How to configure the Resource Manager endpoint for YARN?

2015-04-15 Thread Roger Hoover
Hi,

I'm trying to deploy a job to a small YARN cluster.  How do tell the
launcher script where to find the Resource Manager?  I tried creating a
yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it
doesn't find my config.

2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM 0.0.0.0:8032
2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at /
0.0.0.0:8032

Thanks,

Roger


Re: Extra Systems and other extensions.

2015-04-15 Thread Roger Hoover
Dan,

This is great.  Would love to have a common ElasticSearch system producer.

Cheers,

Roger

On Tue, Apr 14, 2015 at 1:34 PM, Dan  wrote:

> Thanks Jakob, I agree they'll be more maintained and tested if they're in
> the main repo so that's great.
>
> I'll sort out Jira's and get some patches of what we've got working now out
> for review.
>
>  - Dan
>
>
> On 14 April 2015 at 19:56, Jakob Homan  wrote:
>
> > Hey Dan-
> > I'd love for the Elastic Search stuff to be added to the main code, as
> > a separate module.  Keeping these in the main source code keeps them
> > more likely to be maintained and correct.
> >
> > The EvironemtnConfigRewriter can likely go in the same place as the
> > ConfigRewriter interface, since it doesn't depend on Kafka as the
> > current RegexTopicRewriter does.
> >
> > If you could open JIRAs for these, that would be great.  Happy to
> > shepard the code in.
> > -Jakob
> >
> > On 14 April 2015 at 11:46, Dan  wrote:
> > > Hey,
> > >
> > > At state.com we've started to write some generic extensions to Samza
> > that
> > > we think would be more generally useful. We've got
> > > a ElasticsearchSystemProducer/Factory to output to an Elasticsearch
> index
> > > and EnvironmentConfigRewriter to modify config from environment
> variable.
> > >
> > > What's the best way for us to add things like this? Do you want more
> > > modules in the main project or should we just create some separate
> > projects
> > > on github?
> > >
> > > It would be good to get core extensions like these shared to be tested
> > and
> > > used by more people
> > >
> > > Thanks,
> > > Dan
> >
>


Re: Joining Avro records

2015-04-13 Thread Roger Hoover
Hi all,

In case this helps anyone, I was able to create a simple class to do the
join and it works nicely for my use case.  It assumes you have schema for
the input and output records.

Example (
https://github.com/Quantiply/rico/blob/master/avro-serde/src/test/java/com/quantiply/avro/JoinTest.java#L44-L52
):

GenericRecord in1 = getIn1();
GenericRecord in2 = getIn2();

GenericRecord joined = new Join(getJoinedSchema())
.merge(in1)
.merge(in2)
.getBuilder()
.set("charlie", "blah blah")
.build();

Class is here:
https://github.com/Quantiply/rico/blob/master/avro-serde/src/main/java/com/quantiply/avro/Join.java

Cheers,

Roger

On Thu, Apr 9, 2015 at 12:54 PM, Roger Hoover 
wrote:

> Yi Pan,
>
> Thanks for your response.  I'm thinking that I'll iterate over the fields
> of the input schemas (similar to this
> https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62),
> match them up with the output schema and then copy the values.  It'll let
> you know how it goes in case it's useful.
>
> Cheers,
>
> Roger
>
> On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan  wrote:
>
>> Hi, Roger,
>>
>> Good question on that. I am actually not aware of any "automatic" way of
>> doing this in Avro. I have tried to add generic Schema and Data interface
>> in samza-sql branch to address the morphing of the schemas from input
>> streams to the output streams. The basic idea is to have wrapper Schema
>> and
>> Data classes on-top-of the deserialized objects to access the data fields
>> according to the schema w/o changing and copying the actual data fields.
>> Hence, when there is a need to morph the input data schemas into a new
>> output data schema, we just need an implementation of the new output data
>> Schema class that can read the corresponding data fields from the input
>> data and write them out in the output schema. An interface function
>> transform() is added in the Schema class for this exact purpose.
>> Currently,
>> it only takes one input data and one example of "projection"
>> transformation
>> can be found in the implementation of AvroSchema class. A join case as you
>> presented may well be a reason to have an implementation of "join" with
>> multiple input data.
>>
>> All the above solution is still experimental and please feel free to
>> provide your feedback and comments on that. If we agree that this solution
>> is good and suit for a broader use case, it can be considered to be used
>> outside the "SQL" context as well.
>>
>> Best regards!
>>
>> -Yi
>>
>> On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover 
>> wrote:
>>
>> > Hi Milinda and others,
>> >
>> > This is an Avro question but since you guys are working on Avro support
>> for
>> > stream SQL, I thought I'd ask you for help.
>> >
>> > If I have a two records of type A and B as below and want to join them
>> > similar to "SELECT *" in SQL to produce a record of type AB, is there an
>> > simple way to do this with Avro without writing code to copy each field
>> > individually?
>> >
>> > I appreciate any help.
>> >
>> > Thanks,
>> >
>> > Roger
>> >
>> > {
>> >   "name": "A",
>> >   "type": "record",
>> >   "namespace": "fubar",
>> >   "fields": [{"name": "a", "type" : "int"}]
>> > }
>> >
>> > {
>> >   "name": "B",
>> >   "type": "record",
>> >   "namespace": "fubar",
>> >   "fields": [{"name": "b", "type" : "int"}]
>> > }
>> >
>> > {
>> >   "name": "AB",
>> >   "type": "record",
>> >   "namespace": "fubar",
>> >   "fields": [{"name": "a", "type" : "int"}, {"name": "b", "type" :
>> "int"}]
>> > }
>> >
>>
>
>


Re: Joining Avro records

2015-04-09 Thread Roger Hoover
Thanks, Julian.  Good point about needing aliasing for unique names in
SQL.  I didn't know about array_agg...nice.

On Thu, Apr 9, 2015 at 12:35 PM, Julian Hyde  wrote:

> Much of this is about mapping from logical fields (i.e. the fields you can
> reference in SQL) down to the Avro representation; I’m no expert on that
> mapping, so I’ll focus on the SQL stuff.
>
> First, SQL doesn’t allow a record to have two fields of the same name, so
> you wouldn’t be allowed to have two “name” fields. When you do a join, you
> might need to alias output columns:
>
> select stream orders.id, products.id as productId
> from orders
> join products on orders.id = products.id;
>
> Second, JOIN isn’t the only SQL operator that combines records; GROUP BY
> also combines records. JOIN combines records from different streams, and
> they usually have different types (i.e. different numbers/types of fields),
> whereas GROUP BY combines records from the same stream. Use whichever best
> suits your purpose.
>
> select stream zipcode, floor(rowtime to hour), array_agg(orderid) as
> orderIds
> from orders
> group by zipcode, floor(rowtime to hour)
>
> (array_agg is an aggregate function, recently added to the SQL standard,
> that gathers input values into an array. See
> http://www.craigkerstiens.com/2013/04/17/array-agg/.)
>
> Output:
>
> { zipcode: “94705”, rowtime: “2015-04-09 11:00:00”, orderIds: [123, 156,
> 1056] },
> { zipcode: “94117”, rowtime: “2015-04-09 11:00:00”, orderIds: [45, 777] },
> { zipcode: “94705”, rowtime: “2015-04-09 12:00:00”, orderIds: [55] }
>
> Julian
>
>
> On Apr 9, 2015, at 12:07 PM, Yi Pan  wrote:
>
> > Hi, Roger,
> >
> > Good question on that. I am actually not aware of any "automatic" way of
> > doing this in Avro. I have tried to add generic Schema and Data interface
> > in samza-sql branch to address the morphing of the schemas from input
> > streams to the output streams. The basic idea is to have wrapper Schema
> and
> > Data classes on-top-of the deserialized objects to access the data fields
> > according to the schema w/o changing and copying the actual data fields.
> > Hence, when there is a need to morph the input data schemas into a new
> > output data schema, we just need an implementation of the new output data
> > Schema class that can read the corresponding data fields from the input
> > data and write them out in the output schema. An interface function
> > transform() is added in the Schema class for this exact purpose.
> Currently,
> > it only takes one input data and one example of "projection"
> transformation
> > can be found in the implementation of AvroSchema class. A join case as
> you
> > presented may well be a reason to have an implementation of "join" with
> > multiple input data.
> >
> > All the above solution is still experimental and please feel free to
> > provide your feedback and comments on that. If we agree that this
> solution
> > is good and suit for a broader use case, it can be considered to be used
> > outside the "SQL" context as well.
> >
> > Best regards!
> >
> > -Yi
> >
> > On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover 
> wrote:
> >
> >> Hi Milinda and others,
> >>
> >> This is an Avro question but since you guys are working on Avro support
> for
> >> stream SQL, I thought I'd ask you for help.
> >>
> >> If I have a two records of type A and B as below and want to join them
> >> similar to "SELECT *" in SQL to produce a record of type AB, is there an
> >> simple way to do this with Avro without writing code to copy each field
> >> individually?
> >>
> >> I appreciate any help.
> >>
> >> Thanks,
> >>
> >> Roger
> >>
> >> {
> >>  "name": "A",
> >>  "type": "record",
> >>  "namespace": "fubar",
> >>  "fields": [{"name": "a", "type" : "int"}]
> >> }
> >>
> >> {
> >>  "name": "B",
> >>  "type": "record",
> >>  "namespace": "fubar",
> >>  "fields": [{"name": "b", "type" : "int"}]
> >> }
> >>
> >> {
> >>  "name": "AB",
> >>  "type": "record",
> >>  "namespace": "fubar",
> >>  "fields": [{"name": "a", "type" : "int"}, {"name": "b", "type" :
> "int"}]
> >> }
> >>
>
>


Re: Joining Avro records

2015-04-09 Thread Roger Hoover
Yi Pan,

Thanks for your response.  I'm thinking that I'll iterate over the fields
of the input schemas (similar to this
https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62),
match them up with the output schema and then copy the values.  It'll let
you know how it goes in case it's useful.

Cheers,

Roger

On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan  wrote:

> Hi, Roger,
>
> Good question on that. I am actually not aware of any "automatic" way of
> doing this in Avro. I have tried to add generic Schema and Data interface
> in samza-sql branch to address the morphing of the schemas from input
> streams to the output streams. The basic idea is to have wrapper Schema and
> Data classes on-top-of the deserialized objects to access the data fields
> according to the schema w/o changing and copying the actual data fields.
> Hence, when there is a need to morph the input data schemas into a new
> output data schema, we just need an implementation of the new output data
> Schema class that can read the corresponding data fields from the input
> data and write them out in the output schema. An interface function
> transform() is added in the Schema class for this exact purpose. Currently,
> it only takes one input data and one example of "projection" transformation
> can be found in the implementation of AvroSchema class. A join case as you
> presented may well be a reason to have an implementation of "join" with
> multiple input data.
>
> All the above solution is still experimental and please feel free to
> provide your feedback and comments on that. If we agree that this solution
> is good and suit for a broader use case, it can be considered to be used
> outside the "SQL" context as well.
>
> Best regards!
>
> -Yi
>
> On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover 
> wrote:
>
> > Hi Milinda and others,
> >
> > This is an Avro question but since you guys are working on Avro support
> for
> > stream SQL, I thought I'd ask you for help.
> >
> > If I have a two records of type A and B as below and want to join them
> > similar to "SELECT *" in SQL to produce a record of type AB, is there an
> > simple way to do this with Avro without writing code to copy each field
> > individually?
> >
> > I appreciate any help.
> >
> > Thanks,
> >
> > Roger
> >
> > {
> >   "name": "A",
> >   "type": "record",
> >   "namespace": "fubar",
> >   "fields": [{"name": "a", "type" : "int"}]
> > }
> >
> > {
> >   "name": "B",
> >   "type": "record",
> >   "namespace": "fubar",
> >   "fields": [{"name": "b", "type" : "int"}]
> > }
> >
> > {
> >   "name": "AB",
> >   "type": "record",
> >   "namespace": "fubar",
> >   "fields": [{"name": "a", "type" : "int"}, {"name": "b", "type" :
> "int"}]
> > }
> >
>


Joining Avro records

2015-04-09 Thread Roger Hoover
Hi Milinda and others,

This is an Avro question but since you guys are working on Avro support for
stream SQL, I thought I'd ask you for help.

If I have a two records of type A and B as below and want to join them
similar to "SELECT *" in SQL to produce a record of type AB, is there an
simple way to do this with Avro without writing code to copy each field
individually?

I appreciate any help.

Thanks,

Roger

{
  "name": "A",
  "type": "record",
  "namespace": "fubar",
  "fields": [{"name": "a", "type" : "int"}]
}

{
  "name": "B",
  "type": "record",
  "namespace": "fubar",
  "fields": [{"name": "b", "type" : "int"}]
}

{
  "name": "AB",
  "type": "record",
  "namespace": "fubar",
  "fields": [{"name": "a", "type" : "int"}, {"name": "b", "type" : "int"}]
}


Re: Newbie questions after completing "Hello Samza" about performance and project setup

2015-04-09 Thread Roger Hoover
Hi Warren,

Yes, I think Hello Samza is the template project to work from.  I believe
that the slow message rate that you are seeing is because it's subscribed
to the the wikipedia IRC stream which may only generate a few events per
second.

That said, some of the example configuration for the hello samza demo is
not tuned for performance.

In general, enabling compression can help a lot for jobs that are I/O
bound.  Enabling lz4 on JSON data, for example, shrinks it 10x.

On the consumer side, setting  task.consumer.batch.size might help.

On the producer side, you might want to play around with these settings.

systems.kafka.producer.compression.type
systems.kafka.producer.batch.size
systems.kafka.producer.linger.ms

http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html
http://kafka.apache.org/documentation.html#newproducerconfigs

Cheers,

Roger

On Thu, Apr 9, 2015 at 1:14 AM, Warren Henning 
wrote:

> Hi,
>
> I ran the commands in http://samza.apache.org/startup/hello-samza/0.9/
> successfully. Fascinating stuff!
>
> I was running all the processes on my (fairly recent model) Macbook Pro.
> One aspect I've heard about Kafka and Samza is performance -- handling
> thousands of messages a second. E.g.,
>
> http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> talks about doing millions of writes a second. The rate at which the
> console emitted new messages seemed like a rate far slower than that --
> maybe something on the order of 1-2 a second. I ran the commands and
> everything exactly as is listed on the tutorial page.
>
> Of course a laptop is vastly different from a production setup -- what kind
> of assumptions can you make about performance of Samza jobs in development
> mode? I realize it depends on what you're doing -- it's just very different
> from what I was expecting.
>
> Also, I'm not really sure about the best way to get started with writing my
> own Samza jobs. Is there a project template to work off of? Is the Hello
> Samza project it? Maybe import the Maven POM into a favorite IDE and rip
> out the Wikipedia-related classes? As someone who has written Java before
> but doesn't write it every day, it wasn't immediately clear to me.
>
> Apologies if these are addressed in blog posts/FAQs/documentation and I
> failed to research them adequately.
>
> Thanks!
>
> Warren
>


Re: How do you serve the data computed by Samza?

2015-04-02 Thread Roger Hoover
Thanks for the great explanation, Felix!

On Thu, Apr 2, 2015 at 4:08 PM, Felix GV 
wrote:

> Hi Roger,
>
> The slow storage shard situation is indeed a concern.
>
> The slow storage shard will back up your pusher process for all shards if
> the incoming Kafka stream partitions don't line up. Alternatively, your
> pusher process will keep going with the healthy shards but will then need
> to re-consume the whole input stream just to push to the unhealthy shard
> after it recovered. LinkedIn already has (at least) one or two such (push)
> systems and they definitely work, but they have some operational
> limitations, as discussed here.
>
> Now, you can make the pusher process' input topic partitions line up with
> the storage partitions, and that would alleviate the slow shard problem,
> but if you go through the trouble of partitioning your input stream this
> way, then you're not really far off from the pull model anyway.
> Furthermore, even with lined up partitioning, the push approach will still
> have the following caveats compared to pull:
>
>   1.  An extra hop. The hop is probably not a big deal in terms of
> latency, but it does double your bandwidth requirements (and more than
> double if the storage system does not support batch operations which are as
> efficient as Kafka's).
>   2.  Smart throttling. You can throttle with a push approach, but it is
> very crude. Likely the throttling would be a fixed QPS per topic or
> something like that. If you want to take into account the serving latency
> of the storage nodes, in order to back off when the storage node is under
> higher load, then it seems easier to achieve that accurately if you're
> co-located with the storage node, as you would be able to poll its
> performance metrics quicker and more often.
>   3.  Multi-tenancy. If you want many different topics getting fed into
> the storage system, but with different priorities for each, then it may
> also be easier to prioritize the various streams against one another in the
> pull model. If your push process consumes partitions that line up with the
> storage, then technically you can achieve the same thing, but then your
> push process deployment and config becomes a bit more complex. The pushers
> are not just any stateless auto-balanced consumer processes anymore, they
> need to be tied 1:1 or 1:M with the storage nodes. At this point, you
> already went about 80% of the way towards the pull model, so I would argue
> it makes things simpler to just go all the way.
>
> Of course, there's definitely many ways to skin this cat. And some of the
> concerns above shouldn't be someone's highest priority if they're just
> getting started playing around with Samza. But I think it would be nice to
> have an open-source system available which "does all the right things", so
> that even newcomers to Samza can have an easy way to ingest data into their
> serving system.
>
> My 2 cents.
>
> --
>
> Felix GV
> Data Infrastructure Engineer
> Distributed Data Systems
> LinkedIn
>
> f...@linkedin.com
> linkedin.com/in/felixgv
>
> 
> From: Roger Hoover [roger.hoo...@gmail.com]
> Sent: Thursday, April 02, 2015 3:24 PM
> To: dev@samza.apache.org
> Subject: Re: How do you serve the data computed by Samza?
>
> Is it because the Kafka partitioning might not be the same as the storage
> partitioning? So that a slow storage shard will prevent unrelated shards
> from getting their messages?
>
> Ah, I think I see what you mean. If so, then the solution is to make the
> Kafka partitioning match the storage partitioning. If that case, push or
> pull is the same, yeah?
>
> Thanks,
>
> Roger
>
> On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover 
> wrote:
>
> > Chinmay,
> >
> > Thanks for your input.
> >
> > I'm not understanding what the difference is. With the design that Felix
> > laid out, the co-located Kafka consumer is still doing a push to the
> > storage system, right?. It just happens to be on the same machine. How is
> > this different from pushing batches from a non-local Samza job? How does
> > the pull-based approach you're thinking of deal with feedback and SLAs?
> >
> > Thanks,
> >
> > Roger
> >
> >
> >
> > On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman  >
> > wrote:
> >
> >> My 2 cents => One thing to note about the push model : multi-tenancy
> >>
> >> When your storage system (Druid for example) is used in a multi-tenant
> >> fashion - then push model is a bit difficult to operate. Primarily
> because
> &

Re: How do you serve the data computed by Samza?

2015-04-02 Thread Roger Hoover
Is it because the Kafka partitioning might not be the same as the storage
partitioning?  So that a slow storage shard will prevent unrelated shards
from getting their messages?

Ah, I think I see what you mean.  If so, then the solution is to make the
Kafka partitioning match the storage partitioning.  If that case, push or
pull is the same, yeah?

Thanks,

Roger

On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover  wrote:

> Chinmay,
>
> Thanks for your input.
>
> I'm not understanding what the difference is.  With the design that Felix
> laid out, the co-located Kafka consumer is still doing a push to the
> storage system, right?.  It just happens to be on the same machine.  How is
> this different from pushing batches from a non-local Samza job?   How does
> the pull-based approach you're thinking of deal with feedback and SLAs?
>
> Thanks,
>
> Roger
>
>
>
> On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman 
> wrote:
>
>> My 2 cents => One thing to note about the push model : multi-tenancy
>>
>> When your storage system (Druid for example) is used in a multi-tenant
>> fashion - then push model is a bit difficult to operate. Primarily because
>> there is no real feedback loop from the storage system. Yes - if the
>> storage system starts doing bad - then you get timeouts and higher
>> latencies - but then you're already in a position where you're probably
>> breaking SLAs (for some tenant).
>>
>> In that sense, a pull model might be better since the consumer can
>> potentially have more visibility into how this particular node is doing.
>> Also, with the Kafka consumer batches things up - so theoretically - you
>> could get similar throughput. Downside of this approach is of course - the
>> storage system partitioning scheme *has to* line up with the Kafka
>> partitioning scheme.
>>
>> On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover 
>> wrote:
>>
>> > Felix,
>> >
>> > I see your point about simple Kafka consumers.  My thought was that if
>> > you're already managing a Samza/YARN deployment then these types of jobs
>> > would be "just another job" and not require an additional process
>> > management/monitoring/operations setup.  If you've already got a way to
>> > handle vanilla Kafka jobs then it makes sense.
>> >
>> > For the push model, the way we're planning to deal with the latency of
>> > round-trip calls is to batch up pushs to the downstream system.  Both
>> Druid
>> > Tranquility and the ES transport node protocol allow you to batch index
>> > requests.  I'm curious if pull would be that much more efficient.
>> >
>> > Cheers,
>> >
>> > Roger
>> >
>> > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV
>> 
>> > wrote:
>> >
>> > > Hi Roger,
>> > >
>> > > You bring up good points, and I think the short answer is that there
>> are
>> > > trade-offs to everything, of course (:
>> > >
>> > > What I described could definitely be implemented as a Samza job, and I
>> > > think that would make a lot of sense if the data serving system was
>> also
>> > > deployed via YARN. This way, the Samza tasks responsible for ingesting
>> > and
>> > > populating the data serving system's nodes could be spawned wherever
>> YARN
>> > > knows these nodes are located. For data serving systems not well
>> > integrated
>> > > with YARN however, I'm not sure that there would be that much win in
>> > using
>> > > the Samza deployment model. And since the consumers themselves are
>> pretty
>> > > simple (no joining of streams, no local state, etc.), this seems to
>> be a
>> > > case where Samza is a bit overkill and a regular Kafka consumer is
>> > > perfectly fine (except for the YARN-enabled auto-deployment aspect,
>> like
>> > I
>> > > mentioned).
>> > >
>> > > As for push versus pull, I think the trade-off is the following: push
>> is
>> > > mostly simpler and more decoupled, as you said, but I think pull
>> would be
>> > > more efficient. The reason for that is that Kafka consumption is very
>> > > efficient (thanks to batching and compression), but most data serving
>> > > systems don't provide a streaming ingest API for pushing data
>> efficiently
>> > > to them, instead they have single record put/insert APIs which
>> re

Re: How do you serve the data computed by Samza?

2015-04-02 Thread Roger Hoover
Chinmay,

Thanks for your input.

I'm not understanding what the difference is.  With the design that Felix
laid out, the co-located Kafka consumer is still doing a push to the
storage system, right?.  It just happens to be on the same machine.  How is
this different from pushing batches from a non-local Samza job?   How does
the pull-based approach you're thinking of deal with feedback and SLAs?

Thanks,

Roger



On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman 
wrote:

> My 2 cents => One thing to note about the push model : multi-tenancy
>
> When your storage system (Druid for example) is used in a multi-tenant
> fashion - then push model is a bit difficult to operate. Primarily because
> there is no real feedback loop from the storage system. Yes - if the
> storage system starts doing bad - then you get timeouts and higher
> latencies - but then you're already in a position where you're probably
> breaking SLAs (for some tenant).
>
> In that sense, a pull model might be better since the consumer can
> potentially have more visibility into how this particular node is doing.
> Also, with the Kafka consumer batches things up - so theoretically - you
> could get similar throughput. Downside of this approach is of course - the
> storage system partitioning scheme *has to* line up with the Kafka
> partitioning scheme.
>
> On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover 
> wrote:
>
> > Felix,
> >
> > I see your point about simple Kafka consumers.  My thought was that if
> > you're already managing a Samza/YARN deployment then these types of jobs
> > would be "just another job" and not require an additional process
> > management/monitoring/operations setup.  If you've already got a way to
> > handle vanilla Kafka jobs then it makes sense.
> >
> > For the push model, the way we're planning to deal with the latency of
> > round-trip calls is to batch up pushs to the downstream system.  Both
> Druid
> > Tranquility and the ES transport node protocol allow you to batch index
> > requests.  I'm curious if pull would be that much more efficient.
> >
> > Cheers,
> >
> > Roger
> >
> > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV  >
> > wrote:
> >
> > > Hi Roger,
> > >
> > > You bring up good points, and I think the short answer is that there
> are
> > > trade-offs to everything, of course (:
> > >
> > > What I described could definitely be implemented as a Samza job, and I
> > > think that would make a lot of sense if the data serving system was
> also
> > > deployed via YARN. This way, the Samza tasks responsible for ingesting
> > and
> > > populating the data serving system's nodes could be spawned wherever
> YARN
> > > knows these nodes are located. For data serving systems not well
> > integrated
> > > with YARN however, I'm not sure that there would be that much win in
> > using
> > > the Samza deployment model. And since the consumers themselves are
> pretty
> > > simple (no joining of streams, no local state, etc.), this seems to be
> a
> > > case where Samza is a bit overkill and a regular Kafka consumer is
> > > perfectly fine (except for the YARN-enabled auto-deployment aspect,
> like
> > I
> > > mentioned).
> > >
> > > As for push versus pull, I think the trade-off is the following: push
> is
> > > mostly simpler and more decoupled, as you said, but I think pull would
> be
> > > more efficient. The reason for that is that Kafka consumption is very
> > > efficient (thanks to batching and compression), but most data serving
> > > systems don't provide a streaming ingest API for pushing data
> efficiently
> > > to them, instead they have single record put/insert APIs which require
> a
> > > round-trip to be acknowledged. This is perfectly fine in low-throughput
> > > scenarios, but does not support very high throughput of ingestion like
> > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka
> > > consumer) with the data serving node, it makes it a bit more affordable
> > to
> > > do single puts since the (local) round-trip acks would be
> > > near-instantaneous. Pulling also makes the tracking of offsets across
> > > different nodes a bit easier, since each node can consume at its own
> > pace,
> > > and resume at whatever point in the past it needs (i.e.: rewind)
> without
> > > affecting the other replicas. Tracking offsets across many replicas in
> > the
> > > push model is a bit mor

Re: How do you serve the data computed by Samza?

2015-04-02 Thread Roger Hoover
Felix,

I see your point about simple Kafka consumers.  My thought was that if
you're already managing a Samza/YARN deployment then these types of jobs
would be "just another job" and not require an additional process
management/monitoring/operations setup.  If you've already got a way to
handle vanilla Kafka jobs then it makes sense.

For the push model, the way we're planning to deal with the latency of
round-trip calls is to batch up pushs to the downstream system.  Both Druid
Tranquility and the ES transport node protocol allow you to batch index
requests.  I'm curious if pull would be that much more efficient.

Cheers,

Roger

On Wed, Apr 1, 2015 at 10:26 AM, Felix GV 
wrote:

> Hi Roger,
>
> You bring up good points, and I think the short answer is that there are
> trade-offs to everything, of course (:
>
> What I described could definitely be implemented as a Samza job, and I
> think that would make a lot of sense if the data serving system was also
> deployed via YARN. This way, the Samza tasks responsible for ingesting and
> populating the data serving system's nodes could be spawned wherever YARN
> knows these nodes are located. For data serving systems not well integrated
> with YARN however, I'm not sure that there would be that much win in using
> the Samza deployment model. And since the consumers themselves are pretty
> simple (no joining of streams, no local state, etc.), this seems to be a
> case where Samza is a bit overkill and a regular Kafka consumer is
> perfectly fine (except for the YARN-enabled auto-deployment aspect, like I
> mentioned).
>
> As for push versus pull, I think the trade-off is the following: push is
> mostly simpler and more decoupled, as you said, but I think pull would be
> more efficient. The reason for that is that Kafka consumption is very
> efficient (thanks to batching and compression), but most data serving
> systems don't provide a streaming ingest API for pushing data efficiently
> to them, instead they have single record put/insert APIs which require a
> round-trip to be acknowledged. This is perfectly fine in low-throughput
> scenarios, but does not support very high throughput of ingestion like
> Kafka can provide. By co-locating the pulling process (i.e.: Kafka
> consumer) with the data serving node, it makes it a bit more affordable to
> do single puts since the (local) round-trip acks would be
> near-instantaneous. Pulling also makes the tracking of offsets across
> different nodes a bit easier, since each node can consume at its own pace,
> and resume at whatever point in the past it needs (i.e.: rewind) without
> affecting the other replicas. Tracking offsets across many replicas in the
> push model is a bit more annoying, though still doable, of course.
>
> --
>
> Felix GV
> Data Infrastructure Engineer
> Distributed Data Systems
> LinkedIn
>
> f...@linkedin.com
> linkedin.com/in/felixgv
>
> 
> From: Roger Hoover [roger.hoo...@gmail.com]
> Sent: Tuesday, March 31, 2015 8:57 PM
> To: dev@samza.apache.org
> Subject: Re: How do you serve the data computed by Samza?
>
> Ah, thanks for the great explanation.  Any particular reason that the
> job(s) you described should not be Samza jobs?
>
> We're started experimenting with such jobs for Druid and Elasticsearch.
> For Elasticsearch, the Samza job containers join the Elasticsearch cluster
> as transport nodes and use the Java API to push ES data nodes.  Likewise
> for Druid, the Samza job uses the Tranquility API to schedule jobs (
>
> https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza
> ).
>
> The nice part about push versus pull is that the downstream system does not
> need plugins (like ES rivers) that may complicate it's configuration or
> destabilize the system.
>
> Cheers,
>
> Roger
>
> On Tue, Mar 31, 2015 at 10:56 AM, Felix GV  >
> wrote:
>
> > Thanks for your reply Roger! Very insightful (:
> >
> > > 6. If there was a highly-optimized and reliable way of ingesting
> > > partitioned streams quickly into your online serving system, would that
> > > help you leverage Samza more effectively?
> >
> > >> 6. Can you elaborate please?
> >
> > Sure. The feature set I have in mind is the following:
> >
> >   *   Provide a thinly-wrapped Kafka producer which does appropriate
> > partitioning and includes useful metadata (such as production timestamp,
> > etc.) alongside the payload. This producer would be used in the last step
> > of processing of a Samza topology, in order to emit to Kafka some
> > processed/joined/enriched data which is destined for 

Re: How do you serve the data computed by Samza?

2015-03-31 Thread Roger Hoover
Ah, thanks for the great explanation.  Any particular reason that the
job(s) you described should not be Samza jobs?

We're started experimenting with such jobs for Druid and Elasticsearch.
For Elasticsearch, the Samza job containers join the Elasticsearch cluster
as transport nodes and use the Java API to push ES data nodes.  Likewise
for Druid, the Samza job uses the Tranquility API to schedule jobs (
https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza
).

The nice part about push versus pull is that the downstream system does not
need plugins (like ES rivers) that may complicate it's configuration or
destabilize the system.

Cheers,

Roger

On Tue, Mar 31, 2015 at 10:56 AM, Felix GV 
wrote:

> Thanks for your reply Roger! Very insightful (:
>
> > 6. If there was a highly-optimized and reliable way of ingesting
> > partitioned streams quickly into your online serving system, would that
> > help you leverage Samza more effectively?
>
> >> 6. Can you elaborate please?
>
> Sure. The feature set I have in mind is the following:
>
>   *   Provide a thinly-wrapped Kafka producer which does appropriate
> partitioning and includes useful metadata (such as production timestamp,
> etc.) alongside the payload. This producer would be used in the last step
> of processing of a Samza topology, in order to emit to Kafka some
> processed/joined/enriched data which is destined for online serving.
>   *   Provide a consumer process which can be co-located on the same hosts
> as your data serving system. This process consumes from the appropriate
> partitions and checkpoints its offsets on its own. It leverages Kafka
> batching and compression to make consumption very efficient.
>   *   For each records the consumer process issues a put/insert locally to
> the co-located serving process. Since this is a local operation, it is also
> very cheap and efficient.
>   *   The consumer process can also optionally throttle its insertion rate
> by monitoring some performance metrics of the co-located data serving
> process. For example, if the data serving process exposes a p99 latency via
> JMX or other means, this can be used in a tight feedback loop to back off
> if read latency degrades beyond a certain threshold.
>   *   This ingestion platform should be easy to integrate with any
> consistently-routed data serving system, by implementing some simple
> interfaces to let the ingestion system understand the key-to-partition
> assignment strategy, as well as the partition-to-node assignment strategy.
> Optionally, a hook to access performance metrics could also be implemented
> if throttling is deemed important (as described in the previous point).
>   *   Since the consumer process lives in a separate process, the system
> benefits from good isolation guarantees. The consumer process can be capped
> to a low amount of heap, and its GC is inconsequential for the serving
> platform. It's also possible to bounce the consumer and data serving
> processes independently of each other, if need be.
>
> There are some more nuances and additional features which could be nice to
> have, but that's the general idea.
>
>
> It seems to me like such system would be valuable, but I'm wondering what
> other people in the open-source community think, hence why I was interested
> in starting this thread...
>
>
> Thanks for your feedback!
>
> -F
>


Re: [VOTE] Apache Samza 0.9.0 RC0

2015-03-31 Thread Roger Hoover
Nice.  Thanks Yan!

On Tue, Mar 31, 2015 at 3:24 PM, Yan Fang  wrote:

> Cool.
>
> * Published to maven, it's already there.
> * Uploaded to dist/release. It may take a while for mirrors to pick it up.
> * Updated the downloading page in
> https://issues.apache.org/jira/browse/SAMZA-624
> ** Will publish the website after mirrors pick up the 0.9.0 release
> ** In terms of the blog, seem not have the access ?
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Tue, Mar 31, 2015 at 1:36 PM, Chris Riccomini 
> wrote:
>
> > Hey Yan/Jakob,
> >
> > Awesome, thanks! Yan, feel free to finish up the release. :) Very cool!
> >
> > Cheers,
> > Chris
> >
> > On Tue, Mar 31, 2015 at 1:27 PM, Jakob Homan  wrote:
> >
> > > Correct.  All that's necessary for a release is a
> > > more-+1s-than--1s-from-PMC-members vote, and then we can go ahead with
> > > distribution, publicity, etc.
> > > -jg
> > >
> > > On 31 March 2015 at 12:44, Chris Riccomini 
> > wrote:
> > > > Hey Yan,
> > > >
> > > > Let's confirm with Jakob. I *think* we don't need any intervention
> from
> > > > Apache. We should be able to move forward with the release. @Jakob,
> can
> > > you
> > > > confirm this?
> > > >
> > > > Cheers,
> > > > Chris
> > > >
> > > > On Tue, Mar 31, 2015 at 11:17 AM, Yan Fang 
> > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> After 72+ hours, we got +4 binding votes (Chris, Jakob, Chinmay,
> Yan)
> > ,
> > > +2
> > > >> non-binding votes (Roger, Yi Pan). The release vote passes.
> > > >>
> > > >> @Chris, Do we need the vote from apache general mailing list? Or I
> can
> > > go
> > > >> ahead to update to release dist, update download page, publish 0.8.0
> > > >> binaries to Maven and write 0.8.0 blog post?
> > > >>
> > > >> Thanks,
> > > >> Fang, Yan
> > > >> yanfang...@gmail.com
> > > >>
> > > >> On Tue, Mar 31, 2015 at 9:21 AM, Ash W Matheson <
> > ash.mathe...@gmail.com
> > > >
> > > >> wrote:
> > > >>
> > > >>> Of say yes, is been a few days with little traffic on the topic.
> > > >>> On Mar 31, 2015 9:18 AM, "Chris Riccomini" 
> > > wrote:
> > > >>>
> > > >>> > Hey all,
> > > >>> >
> > > >>> > Is the vote done?
> > > >>> >
> > > >>> > Cheers,
> > > >>> > Chris
> > > >>> >
> > > >>> > On Mon, Mar 30, 2015 at 2:10 PM, Chris Riccomini <
> > > criccom...@apache.org
> > > >>> >
> > > >>> > wrote:
> > > >>> >
> > > >>> > > +1
> > > >>> > >
> > > >>> > > 1. Validated hello-samza works with 0.9.0 Maven binaries.
> > > >>> > > 2. Validated release-0.9.0-rc0 tag exists and has correct
> > > checksums.
> > > >>> > > 3. Validated source release tarball builds, and has correct
> > > licenses
> > > >>> > > (bin/check-all.sh).
> > > >>> > > 4. Validated source release tarball validates against Yan's PGP
> > > key.
> > > >>> > > 5. Ran rolling bounce of Kafka cluster with large job (1
> million+
> > > >>> > > messages/sec)
> > > >>> > > 6. Ran Zopkio integration tests, and SAMZA-394 torture test.
> > > >>> > >
> > > >>> > > For (6), I ran the SAMZA-394 tests for > 72 hours with torture
> > test
> > > >>> > > running. No consistency/data loss issues! I did find an issue
> > with
> > > the
> > > >>> > > checker integration test, but I think it's best left for
> 0.10.0,
> > so
> > > >>> I'll
> > > >>> > > open a JIRA to track that.
> > > >>> > >
> > > >>> > >
> > > >>> > > On Mon, Mar 30, 2015 at 10:49 AM, Roger Hoover <
> > > >>> roger.hoo...@gmail.com>
> > > >>> > > wrote:
> > > >>> > >
> 

Re: How do you serve the data computed by Samza?

2015-03-31 Thread Roger Hoover
Hi Felix,

1,3. We're experimenting with both Druid and Elasticsearch for this.  We're
using Samza to enrich user activity and system performance events then
index them in Druid +/or Elasticsearch depending on the use case.
2. These are internal BI/Operations applications
4. We're still getting up to speed on both Druid and Elastisearch to get
the necessary write throughput.  Read throughput has not been an issue.
5. Not yet but don't expect it will easy
6. Can you elaborate please?

Cheers,

Roger

On Fri, Mar 27, 2015 at 9:52 AM, Felix GV 
wrote:

> Hi Samza devs, users and enthusiasts,
>
> I've kept an eye on the Samza project for a while and I think it's super
> cool! I hope it continues to mature and expand as it seems very promising (:
>
> One thing I've been wondering for a while is: how do people serve the data
> they computed on Samza? More specifically:
>
>   1.  How do you expose the output of Samza jobs to online applications
> that need low-latency reads?
>   2.  Are these online apps mostly internal (i.e.: analytics, dashboards,
> etc.) or public/user-facing?
>   3.  What systems do you currently use (or plan to use in the short-term)
> to host the data generated in Samza? HBase? Cassandra? MySQL? Druid? Others?
>   4.  Are you satisfied or are you facing challenges in terms of the write
> throughput supported by these storage/serving systems? What about read
> throughput?
>   5.  Are there situations where you wish to re-process all historical
> data when making improvements to your Samza job, which results in the need
> to re-ingest all of the Samza output into your online serving system (as
> described in the Kappa Architecture<
> http://radar.oreilly.com/2014/07/questioning-the-lambda-architecture.html>)
> ? Is this easy breezy or painful? Do you need to throttle it lest your
> serving system will fall over?
>   6.  If there was a highly-optimized and reliable way of ingesting
> partitioned streams quickly into your online serving system, would that
> help you leverage Samza more effectively?
>
> Your insights would be much appreciated!
>
>
> Thanks (:
>
>
> --
> Felix
>


Re: [VOTE] Apache Samza 0.9.0 RC0

2015-03-30 Thread Roger Hoover
+1

* Created and tested an sample job doing a join
* Build packages
* Couldn't get integration tests to work.  It seemed like it was timing out
trying to download dependencies.  I may have been having network issues
last night.

Cheers,

Roger

On Sun, Mar 29, 2015 at 9:09 PM, Jakob Homan  wrote:

> +1 (binding)
>
> * Verified sig and checksum
> * Spot checked files, verified license and notice
> * Built packages
> * Ran hello samza
>
> Good work, Yan.
> -jg
>
>
> On 29 March 2015 at 13:08, Chinmay Soman 
> wrote:
> > +1
> >
> > * Verified signature against the release
> > * Verified authenticity of the key
> > * Ran hello-samza (latest branch) - all 3 jobs succeed againt 0.9.0
> release
> > * Ran integration tests - all 3 tests pass (after addresing SAMZA-621.
> > Also, once a failure occurs - I have to manually kill the Yarn daemons.
> Not
> > sure if there's a ticket open for that - a quick search did not reveal
> > anything).
> >
> >
> > Good job guys !
> >
> > On Sat, Mar 28, 2015 at 1:36 AM, Yan Fang  wrote:
> >
> >> Hi Chris and Jakob,
> >>
> >> Sure. Let's do the voting until Monday. Hope more guys have time to try
> and
> >> validate the 0.9.0 version.
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> yanfang...@gmail.com
> >> +1 (206) 849-4108
> >>
> >> On Fri, Mar 27, 2015 at 5:09 PM, Chris Riccomini  >
> >> wrote:
> >>
> >> > Hey Yan,
> >> >
> >> > Yea, could we delay until Monday? I have been doing a lot of burn-in,
> and
> >> > have found some issues with the torture tests in SAMZA-394. The issues
> >> > appear to be with the tests themselves, not data-loss/correctness
> issues
> >> in
> >> > Samza, but I just want to make sure. Planning to run the burn-in over
> the
> >> > weekend. I'll open up JIRAs to fix the SAMZA-394 torture tests once
> I'm
> >> > confident it's stable.
> >> >
> >> > Cheers,
> >> > Chris
> >> >
> >> > On Fri, Mar 27, 2015 at 4:28 PM, Jakob Homan 
> wrote:
> >> >
> >> > > That would be great.  I've been trying to get to this but have
> failed.
> >> > > I can definitely look at the release tomorrow.
> >> > > -jg
> >> > >
> >> > >
> >> > > On 27 March 2015 at 16:08, Yan Fang  wrote:
> >> > > > Hi guys,
> >> > > >
> >> > > > It has been 72 hours. We got +1 from Yi Pan. Do we extend the
> voting
> >> to
> >> > > > this weekend ?
> >> > > >
> >> > > > Thanks,
> >> > > > Fang, Yan
> >> > > > yanfang...@gmail.com
> >> > > > +1 (206) 849-4108
> >> > > >
> >> > > > On Thu, Mar 26, 2015 at 11:07 PM, Yi Pan 
> >> wrote:
> >> > > >
> >> > > >> I have ran the integration test suite w/ 0.9.0-rc0. There were
> some
> >> > > issues
> >> > > >> related w/ the integration test: SAMZA-621, but the test suite
> >> passed
> >> > > after
> >> > > >> I manually created a symlink to the file name the test script is
> >> > looking
> >> > > >> for.
> >> > > >>
> >> > > >> Hence, +1 on the release.
> >> > > >>
> >> > > >> On Thu, Mar 26, 2015 at 5:39 PM, Roger Hoover <
> >> roger.hoo...@gmail.com
> >> > >
> >> > > >> wrote:
> >> > > >>
> >> > > >> > Hi Chris + all,
> >> > > >> >
> >> > > >> > I created a basic job that does a join from local state with
> Samza
> >> > > 0.9.0
> >> > > >> (
> >> > > >> >
> >> https://github.com/Quantiply/rico-playground/tree/master/join/samza
> >> > ).
> >> > > >> So
> >> > > >> > far so good. I hoping to get some time this weekend to
> benchmark
> >> it
> >> > > on my
> >> > > >> > laptop.  I think I saw that 0.9.0 includes support for sending
> job
> >> > > logs
> >> > > >> to
> >> > > >> > a topic. I want to try this out as well.
> >> > > >&g

Re: [VOTE] Apache Samza 0.9.0 RC0

2015-03-26 Thread Roger Hoover
Hi Chris + all,

I created a basic job that does a join from local state with Samza 0.9.0 (
https://github.com/Quantiply/rico-playground/tree/master/join/samza).  So
far so good. I hoping to get some time this weekend to benchmark it on my
laptop.  I think I saw that 0.9.0 includes support for sending job logs to
a topic. I want to try this out as well.

Cheers,

Roger

On Thu, Mar 26, 2015 at 5:25 PM, Chris Riccomini 
wrote:

> Hey all,
>
> I'm running validations and some burn-in. I'll post my vote tomorrow.
>
> It's been pretty quiet. It'd be good to get other committers/non-committers
> to do validation as well.
>
> Cheers,
> Chris
>
>
> On Wed, Mar 25, 2015 at 11:20 AM, Yan Fang  wrote:
>
> > Hi Chris,
> >
> > Opps, signed it with another key. Now updated all files in
> > http://people.apache.org/~yanfang/samza-0.9.0-rc0/ . Verified. Sorry for
> > the inconvenience.
> >
> > Cheers,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> > +1 (206) 849-4108
> >
> > On Wed, Mar 25, 2015 at 10:58 AM, Chris Riccomini  >
> > wrote:
> >
> > > Hey Yan,
> > >
> > > Were you able to validate the source tarball? I ran:
> > >
> > > $ gpg --keyserver pgpkeys.mit.edu --recv-key CAC06239EA00BA80
> > > gpg: requesting key EA00BA80 from hkp server pgpkeys.mit.edu
> > > gpg: key EA00BA80: public key "Yan Fang (CODE SIGNING KEY) <
> > > yanf...@apache.org>" imported
> > > gpg: Total number processed: 1
> > > gpg:   imported: 1  (RSA: 1)
> > >
> > > $ gpg --fingerprint CAC06239EA00BA80
> > > pub   4096R/EA00BA80 2015-03-24 [expires: 2020-03-22]
> > >   Key fingerprint = 7091 46DA 2CF3 EACF 476E  B077 CAC0 6239 EA00
> > BA80
> > > uid  Yan Fang (CODE SIGNING KEY) 
> > > sub   4096R/E3F3DAD3 2015-03-24 [expires: 2020-03-22]
> > >
> > > $ gpg apache-samza-0.9.0-src.tgz.asc
> > > gpg: Signature made Tue Mar 24 11:51:58 2015 PDT using RSA key ID
> > 0CAE52EA
> > > gpg: Can't check signature: public key not found
> > >
> > > Cheers,
> > > Chris
> > >
> > > On Tue, Mar 24, 2015 at 3:18 PM, Yan Fang 
> wrote:
> > >
> > > > Hey all,
> > > >
> > > > This is a call for a vote on a release of Apache Samza 0.9.0. This is
> > our
> > > > first release as the Apache top-level project. Thanks to everyone who
> > has
> > > > contributed to this release. We are very glad to see some new
> > > contributors
> > > > in this release.
> > > >
> > > > The release candidate can be downloaded from here:
> > > >
> > > > http://people.apache.org/~yanfang/samza-0.9.0-rc0/
> > > >
> > > > The release candidate is signed with pgp key CAC06239EA00BA80, which
> is
> > > > included in the repository's KEYS file:
> > > >
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
> > > >
> > > > and can also be found on keyservers:
> > > >
> > > > http://pgp.mit.edu/pks/lookup?op=get&search=0xCAC06239EA00BA80
> > > >
> > > > The git tag is release-0.9.0-rc0 and signed with the same pgp key:
> > > >
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=1039f7ede6490f9420dcecd6adc7677b97e78bcf
> > > >
> > > > Test binaries have been published to Maven's staging repository, and
> > are
> > > > available here:
> > > >
> > > >
> > https://repository.apache.org/content/repositories/orgapachesamza-1005/
> > > >
> > > > Note that the binaries were built with JDK6 without incident.
> > > >
> > > > 95 issues were resolved for this release:
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.0%20AND%20status%20in%20(Resolved%2C%20Closed)
> > > >
> > > > The vote will be open for 72 hours ( end in 4:00pm Friday, 03/27/2015
> > ).
> > > > Please download the release candidate, check the hashes/signature,
> > build
> > > it
> > > > and test it, and then please vote:
> > > >
> > > > [ ] +1 approve
> > > > [ ] +0 no opinion
> > > > [ ] -1 disapprove (and reason why)
> > > >
> > > > +1 from my side for the release.
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > > +1 (206) 849-4108
> > > >
> > >
> >
>


Re: Kafka partition key

2015-03-26 Thread Roger Hoover
Hi Richard,

You can also partition by a key like "user_id" so that all messages for a
given user would end up in the same partition.  This can be useful for
calculating user-specific aggregations or doing a distributed join where
the local state is also partitioned on user_id.

Cheers,

Roger

On Thu, Mar 26, 2015 at 9:28 AM, Richard Lee  wrote:

> Is there a typo below?  Are all of these actually in the same topic, just
> different partitions?  Partitioning, AFAIK, is mainly done for parallelism
> & throughput reasons.  What is the reason for partitioning your dataset by
> ‘columns’?
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIchoosethenumberofpartitionsforatopic
> ?
>
> Richard
>
> > On Mar 26, 2015, at 8:22 AM, Shekar Tippur  wrote:
> >
> > Hello,
> >
> > Want to confirm a basic understanding of Kafka.
> > If I have a dataset that needs to be partitioned by 4 columns, then the
> > progression is
> >
> > {topic1:partition_key1} -> {Group by samza on partition_key1}
> > ->
> > {topic2:partition_key2} -> {Group by samza on partition_key2}
> > ->
> > {topic3:partition_key3} -> {Group by samza on partition_key3}
> > ->
> > {topic4:partition_key4} -> {Group by samza on partition_key4}
> >
> > Can you please confirm if my understanding is right?
> >
> > - Shekar
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: java.lang.NoClassDefFoundError on Yarn job

2015-03-26 Thread Roger Hoover
Hi Jordi,

You might be running into this issue (
https://issues.apache.org/jira/browse/SAMZA-456) which I just hit as well.
You probably need to add a couple more jars to your YARN lib dir.

Cheers,

Roger

On Thu, Mar 26, 2015 at 9:21 AM, Jordi Blasi Uribarri 
wrote:

> Hi:
>
> I got samza running a job in local mode with the property:
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
>
> Now I am trying to get it running in multiple machines. I have followed
> the steps in the following guide:
>
>
> https://github.com/apache/samza/blob/master/docs/learn/tutorials/versioned/run-in-multi-node-yarn.md
>
> I see the node up and running.
>
> I have created a tar.gz file with the contents of the bin and lib folders
> that were running locally Yarn and published it in a local Apache2 web
> server. The properties file looks like this:
>
> task.class=samzafroga.job1
> job.name=samzafroga.job1
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> yarn.package.path= http://192.168.15.92/jobs/samzajob1.tar.gz
>
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.consumer.zookeeper.connect= broker01:2181
> systems.kafka.producer.bootstrap.servers= broker01:9092
>
> task.inputs=kafka.syslog
>
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> systems.kafka.streams.syslog.samza.msg.serde=string
> systems.kafka.streams.samzaout.samza.msg.serde=string
>
> When I run the same command that was working in the local mode:
> bin/run-job.sh
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=file://$PWD/job1.properties
>
> I see the following exception:
> java version "1.7.0_75"
> OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2)
> OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
> /usr/lib/jvm/java-7-openjdk-amd64/bin/java
> -Dlog4j.configuration=file:bin/log4j-console.xml -Dsamza.log.dir=/opt/jobs
> -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M -XX:+PrintGCDateStamps
> -Xloggc:/opt/jobs/gc.log -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp
> /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies.jar
> org.apache.samza.job.JobRunner
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=file:///opt/jobs/job1.properties
> log4j: reset attribute= "false".
> log4j: Threshold ="null".
> log4j: Level value for root is  [INFO].
> log4j: root level set to INFO
> log4j: Class name: [org.apache.log4j.ConsoleAppender]
> log4j: Parsing layout of class: "org.apache.log4j.PatternLayout"
> log4j: Setting property [conversionPattern] to [%d{dd MMM  HH:mm:ss}
> %5p %c{1} - %m%n].
> log4j: Adding appender named [consoleAppender] to category [root].
> log4j: Class name: [org.apache.log4j.RollingFileAppender]
> log4j: Setting property [append] to [false].
> log4j: Setting property [file] to [out/learning.log].
> log4j: Parsing layout of class: "org.apache.log4j.PatternLayout"
> log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p [%c{1}]
> %m%n].
> log4j: setFile called: out/learning.log, false
> log4j: setFile ended
> log4j: Adding appender named [fileAppender] to category [root].
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:191)
> at org.apache.samza.job.JobRunner.run(JobRunner.scala:56)
> at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.conf.Configuration
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 5 more
>
> I guess there is a problem with the job package but I am not sure how to
> solve it.
>
> Thanks,
>
> Jordi
> 
> Jordi Blasi Uribarri
> Área I+D+i
>
> jbl...@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
>


Re: Error running integration tests

2015-03-25 Thread Roger Hoover
That's what I was wondering.  I can turn it on if necessary.

Sent from my iPhone

> On Mar 25, 2015, at 5:15 PM, Chris Riccomini  wrote:
> 
> Hey Roger,
> 
> Are you able to run this command?
> 
>  ssh localhost
> 
> This is effectively what Zopkio is doing. Wondering if you need to enable
> SSH on your laptop? I have "remote login" enabled on my OSX laptop.
> 
> Cheers,
> Chris
> 
> On Wed, Mar 25, 2015 at 4:29 PM, Roger Hoover 
> wrote:
> 
>> Do I need to bring up sshd on my laptop or can the tests be made to not
>> ssh?
>> 
>> On Wed, Mar 25, 2015 at 4:27 PM, Roger Hoover 
>> wrote:
>> 
>>> Hi,
>>> 
>>> I wanted to see if I could run the integration tests on the 0.9.0 branch
>>> on my Mac.
>>> 
>>> I cloned the 0.9.0 branch from the github mirror, built everything
>>> (./gradlew clean build), and tried to run the integration tests.
>>> 
>>> ./bin/integration-tests.sh /tmp/roger
>>> I get an error when the test script tries to deploy ZooKeeper using SSH.
>>> I'm running on Mac OS X.
>>> 
>>> Any suggestions?
>>> 
>>> Thanks,
>>> 
>>> Roger
>>> 
>>> 2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single
>>> execution due to setup_suite failure:
>>> 
>>> Traceback (most recent call last):
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py",
>>> line 107, in run
>>> 
>>>self.deployment_module.setup_suite()
>>> 
>>>  File "/tmp/roger/scripts/deployment.py", line 76, in setup_suite
>>> 
>>>'hostname': host
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py",
>>> line 77, in deploy
>>> 
>>>self.install(unique_id, configs)
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py",
>>> line 129, in install
>>> 
>>>with get_ssh_client(hostname, username=runtime.get_username(),
>>> password=runtime.get_password()) as ssh:
>>> 
>>>  File
>>> 
>> "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py",
>>> line 17, in __enter__
>>> 
>>>return self.gen.next()
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py",
>>> line 204, in get_ssh_client
>>> 
>>>ssh.connect(hostname, username=username, password=password)
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
>>> line 251, in connect
>>> 
>>>retry_on_signal(lambda: sock.connect(addr))
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py",
>>> line 270, in retry_on_signal
>>> 
>>>return function()
>>> 
>>>  File
>>> 
>> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
>>> line 251, in 
>>> 
>>>retry_on_signal(lambda: sock.connect(addr))
>>> 
>>>  File
>>> 
>> "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py",
>>> line 224, in meth
>>> 
>>>return getattr(self._sock,name)(*args)
>>> 
>>> error: [Errno 61] Connection refused
>>> 
>>> 
>> 


Re: Error running integration tests

2015-03-25 Thread Roger Hoover
Do I need to bring up sshd on my laptop or can the tests be made to not ssh?

On Wed, Mar 25, 2015 at 4:27 PM, Roger Hoover 
wrote:

> Hi,
>
> I wanted to see if I could run the integration tests on the 0.9.0 branch
> on my Mac.
>
> I cloned the 0.9.0 branch from the github mirror, built everything
> (./gradlew clean build), and tried to run the integration tests.
>
> ./bin/integration-tests.sh /tmp/roger
> I get an error when the test script tries to deploy ZooKeeper using SSH.
> I'm running on Mac OS X.
>
> Any suggestions?
>
> Thanks,
>
> Roger
>
> 2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single
> execution due to setup_suite failure:
>
> Traceback (most recent call last):
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py",
> line 107, in run
>
> self.deployment_module.setup_suite()
>
>   File "/tmp/roger/scripts/deployment.py", line 76, in setup_suite
>
> 'hostname': host
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py",
> line 77, in deploy
>
> self.install(unique_id, configs)
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py",
> line 129, in install
>
> with get_ssh_client(hostname, username=runtime.get_username(),
> password=runtime.get_password()) as ssh:
>
>   File
> "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py",
> line 17, in __enter__
>
> return self.gen.next()
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py",
> line 204, in get_ssh_client
>
> ssh.connect(hostname, username=username, password=password)
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
> line 251, in connect
>
> retry_on_signal(lambda: sock.connect(addr))
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py",
> line 270, in retry_on_signal
>
> return function()
>
>   File
> "/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
> line 251, in 
>
> retry_on_signal(lambda: sock.connect(addr))
>
>   File
> "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py",
> line 224, in meth
>
> return getattr(self._sock,name)(*args)
>
> error: [Errno 61] Connection refused
>
>


Error running integration tests

2015-03-25 Thread Roger Hoover
Hi,

I wanted to see if I could run the integration tests on the 0.9.0 branch on
my Mac.

I cloned the 0.9.0 branch from the github mirror, built everything
(./gradlew clean build), and tried to run the integration tests.

./bin/integration-tests.sh /tmp/roger
I get an error when the test script tries to deploy ZooKeeper using SSH.
I'm running on Mac OS X.

Any suggestions?

Thanks,

Roger

2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single
execution due to setup_suite failure:

Traceback (most recent call last):

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py",
line 107, in run

self.deployment_module.setup_suite()

  File "/tmp/roger/scripts/deployment.py", line 76, in setup_suite

'hostname': host

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py",
line 77, in deploy

self.install(unique_id, configs)

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py",
line 129, in install

with get_ssh_client(hostname, username=runtime.get_username(),
password=runtime.get_password()) as ssh:

  File
"/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py",
line 17, in __enter__

return self.gen.next()

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py",
line 204, in get_ssh_client

ssh.connect(hostname, username=username, password=password)

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
line 251, in connect

retry_on_signal(lambda: sock.connect(addr))

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py",
line 270, in retry_on_signal

return function()

  File
"/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py",
line 251, in 

retry_on_signal(lambda: sock.connect(addr))

  File
"/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py",
line 224, in meth

return getattr(self._sock,name)(*args)

error: [Errno 61] Connection refused


Re: Log error deploying on YARN [Samza 0.8.0]

2015-03-24 Thread Roger Hoover
Ah, yes. That's it!  Thanks, Chris.

On Tue, Mar 24, 2015 at 2:30 PM, Chris Riccomini 
wrote:

> Hey Roger,
>
> You're likely hitting this issue:
>
>   https://issues.apache.org/jira/browse/SAMZA-456
>
> Can you have a look and see if that's the problem? We missed some JARs that
> need to be put in to the YARN NM classpath.
>
> Cheers,
> Chris
>
> On Tue, Mar 24, 2015 at 2:22 PM, Roger Hoover 
> wrote:
>
> > Hi all,
> >
> > I'm new to YARN and trying to have YARN download the Samza job tarball (
> > https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html
> ).
> > From the log, it seems that the download failed.  I've tested that the
> file
> > is available via curl.  The error message is:
> > org/apache/samza/util/Logging
> >
> > I appreciate any suggestions.
> >
> > Roger
> >
> >
> > 2015-03-24 17:13:05,469 INFO  [Socket Reader #1 for port 33749]
> ipc.Server
> > (Server.java:saslProcess(1294)) - Auth successful for
> > appattempt_1427226422217_0005_02 (auth:SIMPLE)
> >
> > 2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
> > containermanager.ContainerManagerImpl
> > (ContainerManagerImpl.java:startContainerInternal(572)) - Start request
> for
> > container_1427226422217_0005_02_01 by user opintel
> >
> > 2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
> > nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) -
> > USER=opintel
> > IP=10.53.152.54 OPERATION=Start Container Request
> > TARGET=ContainerManageImpl
> > RESULT=SUCCESS APPID=application_1427226422217_0005
> > CONTAINERID=container_1427226422217_0005_02_01
> >
> > 2015-03-24 17:13:05,473 INFO  [AsyncDispatcher event handler]
> > application.Application (ApplicationImpl.java:transition(296)) - Adding
> > container_1427226422217_0005_02_01 to application
> > application_1427226422217_0005
> >
> > 2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
> > container.Container (ContainerImpl.java:handle(884)) - Container
> > container_1427226422217_0005_02_01 transitioned from NEW to
> LOCALIZING
> >
> > 2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
> > containermanager.AuxServices (AuxServices.java:handle(175)) - Got event
> > CONTAINER_INIT for appId application_1427226422217_0005
> >
> > 2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
> > localizer.LocalizedResource (LocalizedResource.java:handle(196)) -
> Resource
> > http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned
> from
> > INIT to DOWNLOADING
> >
> > 2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
> > localizer.ResourceLocalizationService
> > (ResourceLocalizationService.java:handle(596)) - Created localizer for
> > container_1427226422217_0005_02_01
> >
> > 2015-03-24 17:13:05,480 INFO  [LocalizerRunner for
> > container_1427226422217_0005_02_01]
> > localizer.ResourceLocalizationService
> > (ResourceLocalizationService.java:writeCredentials(1029)) - Writing
> > credentials to the nmPrivate file
> >
> >
> /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens.
> > Credentials list:
> >
> > 2015-03-24 17:13:05,481 INFO  [LocalizerRunner for
> > container_1427226422217_0005_02_01]
> > nodemanager.DefaultContainerExecutor
> > (DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing
> > user opintel
> >
> > 2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
> > container_1427226422217_0005_02_01]
> > nodemanager.DefaultContainerExecutor
> > (DefaultContainerExecutor.java:startLocalizer(103)) - Copying from
> >
> >
> /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens
> > to
> >
> >
> /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens
> >
> > 2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
> > container_1427226422217_0005_02_01]
> > nodemanager.DefaultContainerExecutor
> > (DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to
> >
> >
> /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
> > =
> >
> >
> file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
> >
> > 2015-03-24 17:13:05,520 INFO  [IPC Server han

Log error deploying on YARN [Samza 0.8.0]

2015-03-24 Thread Roger Hoover
Hi all,

I'm new to YARN and trying to have YARN download the Samza job tarball (
https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html).
>From the log, it seems that the download failed.  I've tested that the file
is available via curl.  The error message is:  org/apache/samza/util/Logging

I appreciate any suggestions.

Roger


2015-03-24 17:13:05,469 INFO  [Socket Reader #1 for port 33749] ipc.Server
(Server.java:saslProcess(1294)) - Auth successful for
appattempt_1427226422217_0005_02 (auth:SIMPLE)

2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
containermanager.ContainerManagerImpl
(ContainerManagerImpl.java:startContainerInternal(572)) - Start request for
container_1427226422217_0005_02_01 by user opintel

2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) - USER=opintel
IP=10.53.152.54 OPERATION=Start Container Request TARGET=ContainerManageImpl
RESULT=SUCCESS APPID=application_1427226422217_0005
CONTAINERID=container_1427226422217_0005_02_01

2015-03-24 17:13:05,473 INFO  [AsyncDispatcher event handler]
application.Application (ApplicationImpl.java:transition(296)) - Adding
container_1427226422217_0005_02_01 to application
application_1427226422217_0005

2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
container.Container (ContainerImpl.java:handle(884)) - Container
container_1427226422217_0005_02_01 transitioned from NEW to LOCALIZING

2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
containermanager.AuxServices (AuxServices.java:handle(175)) - Got event
CONTAINER_INIT for appId application_1427226422217_0005

2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource
http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from
INIT to DOWNLOADING

2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:handle(596)) - Created localizer for
container_1427226422217_0005_02_01

2015-03-24 17:13:05,480 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:writeCredentials(1029)) - Writing
credentials to the nmPrivate file
/tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens.
Credentials list:

2015-03-24 17:13:05,481 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing
user opintel

2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:startLocalizer(103)) - Copying from
/tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens
to
/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens

2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to
/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
=
file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005

2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:update(928)) - DEBUG: FAILED { http://
somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null },
org/apache/samza/util/Logging

2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource
http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from
DOWNLOADING to FAILED

2015-03-24 17:13:05,520 INFO  [AsyncDispatcher event handler]
container.Container (ContainerImpl.java:handle(884)) - Container
container_1427226422217_0005_02_01 transitioned from LOCALIZING to
LOCALIZATION_FAILED

2015-03-24 17:13:05,521 INFO  [AsyncDispatcher event handler]
localizer.LocalResourcesTrackerImpl
(LocalResourcesTrackerImpl.java:handle(137)) - Container
container_1427226422217_0005_02_01 sent RELEASE event on a resource
request { http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0,
ARCHIVE, null } not present in cache.

2015-03-24 17:13:05,521 WARN  [LocalizerRunner for
container_1427226422217_0005_02_01] ipc.Client (Client.java:call(1388))
- interrupted waiting to send rpc request to server

java.lang.InterruptedException

at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400)

at java.util.concurrent.FutureTask.get(FutureTask.java:187)

at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1029)

at org.apa

Re: Kafka topic naming conventions

2015-03-19 Thread Roger Hoover
Renato,

Thanks for the link.  Some interesting suggests there as well.

On Thu, Mar 19, 2015 at 2:28 AM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> There was an interesting discussion over in the kafka mailing list that
> might give you more ideas Roger.
> Although they don't mention anything about the number of partitions when
> doing so, anyways maybe it helps.
>
>
> Renato M.
>
> [1] https://www.mail-archive.com/users@kafka.apache.org/msg11976.html
>
> 2015-03-19 5:43 GMT+01:00 Roger Hoover :
>
> > Thanks, guys.  I was also playing around with including partition count
> and
> > even the partition key in the topic name.   My thought was that topics
> may
> > have the same data and number of partitions but only differ by partition
> > key.  After a while, the naming does get crazy (too long and ugly).  We
> > really need a topic metatdata store.
> >
> > On Wed, Mar 18, 2015 at 6:21 PM, Chinmay Soman <
> chinmay.cere...@gmail.com>
> > wrote:
> >
> > > Yeah ! It does seem a bit hackish - but I think this approach promises
> > less
> > > config/operation errors.
> > >
> > > Although I think some of these checks can be built within Samza -
> > assuming
> > > Kafka has a metadata store in the near future - the Samza container can
> > > validate the #topics against this store.
> > >
> > > On Wed, Mar 18, 2015 at 6:16 PM, Chris Riccomini <
> criccom...@apache.org>
> > > wrote:
> > >
> > > > Hey Chinmay,
> > > >
> > > > Cool, this is good feedback. I didn't think I was *that* crazy. :)
> > > >
> > > > Cheers,
> > > > Chris
> > > >
> > > > On Wed, Mar 18, 2015 at 6:10 PM, Chinmay Soman <
> > > chinmay.cere...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thats what we're doing as well - appending partition count to the
> > kafka
> > > > > topic name. This actually helps keep track of the #partitions for
> > each
> > > > > topic (since Kafka doesn't have a Metadata store yet).
> > > > >
> > > > > In case of topic expansion - we actually just resort to creating a
> > new
> > > > > topic. Although that is an overhead - the thought process is that
> > this
> > > > will
> > > > > minimize operational errors. Also, this is necessary to do in case
> > > we're
> > > > > doing some kind of joins.
> > > > >
> > > > >
> > > > > On Wed, Mar 18, 2015 at 5:59 PM, Jakob Homan 
> > > wrote:
> > > > >
> > > > > > On 18 March 2015 at 17:48, Chris Riccomini <
> criccom...@apache.org>
> > > > > wrote:
> > > > > > > One thing I haven't seen, but might be relevant, is including
> > > > partition
> > > > > > > counts in the topic.
> > > > > >
> > > > > > Yeah, but then if you change the partition count later on, you've
> > got
> > > > > > incorrect information forever. Or you need to create a new
> stream,
> > > > > > which might be a nice forcing function to make sure your join
> isn't
> > > > > > screwed up.  There'd need to be something somewhere to enforce
> that
> > > > > > though.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks and regards
> > > > >
> > > > > Chinmay Soman
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks and regards
> > >
> > > Chinmay Soman
> > >
> >
>


  1   2   >