Kafka topic naming conventions

2015-03-18 Thread Roger Hoover
Hi,

Wondering what naming conventions people are using for topics in Kafka.
When there's re-partitioning involved, you can end up with multiple topics
that have the exact same data but are partitioned differently.  How do you
name them?

Thanks,

Roger


Re: Kafka topic naming conventions

2015-03-18 Thread Roger Hoover
Thanks, guys.  I was also playing around with including partition count and
even the partition key in the topic name.   My thought was that topics may
have the same data and number of partitions but only differ by partition
key.  After a while, the naming does get crazy (too long and ugly).  We
really need a topic metatdata store.

On Wed, Mar 18, 2015 at 6:21 PM, Chinmay Soman chinmay.cere...@gmail.com
wrote:

 Yeah ! It does seem a bit hackish - but I think this approach promises less
 config/operation errors.

 Although I think some of these checks can be built within Samza - assuming
 Kafka has a metadata store in the near future - the Samza container can
 validate the #topics against this store.

 On Wed, Mar 18, 2015 at 6:16 PM, Chris Riccomini criccom...@apache.org
 wrote:

  Hey Chinmay,
 
  Cool, this is good feedback. I didn't think I was *that* crazy. :)
 
  Cheers,
  Chris
 
  On Wed, Mar 18, 2015 at 6:10 PM, Chinmay Soman 
 chinmay.cere...@gmail.com
  wrote:
 
   Thats what we're doing as well - appending partition count to the kafka
   topic name. This actually helps keep track of the #partitions for each
   topic (since Kafka doesn't have a Metadata store yet).
  
   In case of topic expansion - we actually just resort to creating a new
   topic. Although that is an overhead - the thought process is that this
  will
   minimize operational errors. Also, this is necessary to do in case
 we're
   doing some kind of joins.
  
  
   On Wed, Mar 18, 2015 at 5:59 PM, Jakob Homan jgho...@gmail.com
 wrote:
  
On 18 March 2015 at 17:48, Chris Riccomini criccom...@apache.org
   wrote:
 One thing I haven't seen, but might be relevant, is including
  partition
 counts in the topic.
   
Yeah, but then if you change the partition count later on, you've got
incorrect information forever. Or you need to create a new stream,
which might be a nice forcing function to make sure your join isn't
screwed up.  There'd need to be something somewhere to enforce that
though.
   
  
  
  
   --
   Thanks and regards
  
   Chinmay Soman
  
 



 --
 Thanks and regards

 Chinmay Soman



Re: Log error deploying on YARN [Samza 0.8.0]

2015-03-24 Thread Roger Hoover
Ah, yes. That's it!  Thanks, Chris.

On Tue, Mar 24, 2015 at 2:30 PM, Chris Riccomini criccom...@apache.org
wrote:

 Hey Roger,

 You're likely hitting this issue:

   https://issues.apache.org/jira/browse/SAMZA-456

 Can you have a look and see if that's the problem? We missed some JARs that
 need to be put in to the YARN NM classpath.

 Cheers,
 Chris

 On Tue, Mar 24, 2015 at 2:22 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Hi all,
 
  I'm new to YARN and trying to have YARN download the Samza job tarball (
  https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html
 ).
  From the log, it seems that the download failed.  I've tested that the
 file
  is available via curl.  The error message is:
  org/apache/samza/util/Logging
 
  I appreciate any suggestions.
 
  Roger
 
 
  2015-03-24 17:13:05,469 INFO  [Socket Reader #1 for port 33749]
 ipc.Server
  (Server.java:saslProcess(1294)) - Auth successful for
  appattempt_1427226422217_0005_02 (auth:SIMPLE)
 
  2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
  containermanager.ContainerManagerImpl
  (ContainerManagerImpl.java:startContainerInternal(572)) - Start request
 for
  container_1427226422217_0005_02_01 by user opintel
 
  2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
  nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) -
  USER=opintel
  IP=10.53.152.54 OPERATION=Start Container Request
  TARGET=ContainerManageImpl
  RESULT=SUCCESS APPID=application_1427226422217_0005
  CONTAINERID=container_1427226422217_0005_02_01
 
  2015-03-24 17:13:05,473 INFO  [AsyncDispatcher event handler]
  application.Application (ApplicationImpl.java:transition(296)) - Adding
  container_1427226422217_0005_02_01 to application
  application_1427226422217_0005
 
  2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
  container.Container (ContainerImpl.java:handle(884)) - Container
  container_1427226422217_0005_02_01 transitioned from NEW to
 LOCALIZING
 
  2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
  containermanager.AuxServices (AuxServices.java:handle(175)) - Got event
  CONTAINER_INIT for appId application_1427226422217_0005
 
  2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
  localizer.LocalizedResource (LocalizedResource.java:handle(196)) -
 Resource
  http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned
 from
  INIT to DOWNLOADING
 
  2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
  localizer.ResourceLocalizationService
  (ResourceLocalizationService.java:handle(596)) - Created localizer for
  container_1427226422217_0005_02_01
 
  2015-03-24 17:13:05,480 INFO  [LocalizerRunner for
  container_1427226422217_0005_02_01]
  localizer.ResourceLocalizationService
  (ResourceLocalizationService.java:writeCredentials(1029)) - Writing
  credentials to the nmPrivate file
 
 
 /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens.
  Credentials list:
 
  2015-03-24 17:13:05,481 INFO  [LocalizerRunner for
  container_1427226422217_0005_02_01]
  nodemanager.DefaultContainerExecutor
  (DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing
  user opintel
 
  2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
  container_1427226422217_0005_02_01]
  nodemanager.DefaultContainerExecutor
  (DefaultContainerExecutor.java:startLocalizer(103)) - Copying from
 
 
 /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens
  to
 
 
 /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens
 
  2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
  container_1427226422217_0005_02_01]
  nodemanager.DefaultContainerExecutor
  (DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to
 
 
 /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
  =
 
 
 file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
 
  2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
  localizer.ResourceLocalizationService
  (ResourceLocalizationService.java:update(928)) - DEBUG: FAILED { http://
  somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null },
  org/apache/samza/util/Logging
 
  2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
  localizer.LocalizedResource (LocalizedResource.java:handle(196)) -
 Resource
  http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned
 from
  DOWNLOADING to FAILED
 
  2015-03-24 17:13:05,520 INFO  [AsyncDispatcher event handler]
  container.Container (ContainerImpl.java:handle(884)) - Container
  container_1427226422217_0005_02_01 transitioned from LOCALIZING to
  LOCALIZATION_FAILED
 
  2015-03-24 17:13:05,521 INFO  [AsyncDispatcher event handler]
  localizer.LocalResourcesTrackerImpl

Re: Error running integration tests

2015-03-25 Thread Roger Hoover
Do I need to bring up sshd on my laptop or can the tests be made to not ssh?

On Wed, Mar 25, 2015 at 4:27 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Hi,

 I wanted to see if I could run the integration tests on the 0.9.0 branch
 on my Mac.

 I cloned the 0.9.0 branch from the github mirror, built everything
 (./gradlew clean build), and tried to run the integration tests.

 ./bin/integration-tests.sh /tmp/roger
 I get an error when the test script tries to deploy ZooKeeper using SSH.
 I'm running on Mac OS X.

 Any suggestions?

 Thanks,

 Roger

 2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single
 execution due to setup_suite failure:

 Traceback (most recent call last):

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py,
 line 107, in run

 self.deployment_module.setup_suite()

   File /tmp/roger/scripts/deployment.py, line 76, in setup_suite

 'hostname': host

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py,
 line 77, in deploy

 self.install(unique_id, configs)

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py,
 line 129, in install

 with get_ssh_client(hostname, username=runtime.get_username(),
 password=runtime.get_password()) as ssh:

   File
 /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py,
 line 17, in __enter__

 return self.gen.next()

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py,
 line 204, in get_ssh_client

 ssh.connect(hostname, username=username, password=password)

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py,
 line 251, in connect

 retry_on_signal(lambda: sock.connect(addr))

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py,
 line 270, in retry_on_signal

 return function()

   File
 /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py,
 line 251, in lambda

 retry_on_signal(lambda: sock.connect(addr))

   File
 /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py,
 line 224, in meth

 return getattr(self._sock,name)(*args)

 error: [Errno 61] Connection refused




Error running integration tests

2015-03-25 Thread Roger Hoover
Hi,

I wanted to see if I could run the integration tests on the 0.9.0 branch on
my Mac.

I cloned the 0.9.0 branch from the github mirror, built everything
(./gradlew clean build), and tried to run the integration tests.

./bin/integration-tests.sh /tmp/roger
I get an error when the test script tries to deploy ZooKeeper using SSH.
I'm running on Mac OS X.

Any suggestions?

Thanks,

Roger

2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single
execution due to setup_suite failure:

Traceback (most recent call last):

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py,
line 107, in run

self.deployment_module.setup_suite()

  File /tmp/roger/scripts/deployment.py, line 76, in setup_suite

'hostname': host

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py,
line 77, in deploy

self.install(unique_id, configs)

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py,
line 129, in install

with get_ssh_client(hostname, username=runtime.get_username(),
password=runtime.get_password()) as ssh:

  File
/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py,
line 17, in __enter__

return self.gen.next()

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py,
line 204, in get_ssh_client

ssh.connect(hostname, username=username, password=password)

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py,
line 251, in connect

retry_on_signal(lambda: sock.connect(addr))

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py,
line 270, in retry_on_signal

return function()

  File
/tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py,
line 251, in lambda

retry_on_signal(lambda: sock.connect(addr))

  File
/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py,
line 224, in meth

return getattr(self._sock,name)(*args)

error: [Errno 61] Connection refused


Re: How do you serve the data computed by Samza?

2015-03-31 Thread Roger Hoover
Ah, thanks for the great explanation.  Any particular reason that the
job(s) you described should not be Samza jobs?

We're started experimenting with such jobs for Druid and Elasticsearch.
For Elasticsearch, the Samza job containers join the Elasticsearch cluster
as transport nodes and use the Java API to push ES data nodes.  Likewise
for Druid, the Samza job uses the Tranquility API to schedule jobs (
https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza
).

The nice part about push versus pull is that the downstream system does not
need plugins (like ES rivers) that may complicate it's configuration or
destabilize the system.

Cheers,

Roger

On Tue, Mar 31, 2015 at 10:56 AM, Felix GV fville...@linkedin.com.invalid
wrote:

 Thanks for your reply Roger! Very insightful (:

  6. If there was a highly-optimized and reliable way of ingesting
  partitioned streams quickly into your online serving system, would that
  help you leverage Samza more effectively?

  6. Can you elaborate please?

 Sure. The feature set I have in mind is the following:

   *   Provide a thinly-wrapped Kafka producer which does appropriate
 partitioning and includes useful metadata (such as production timestamp,
 etc.) alongside the payload. This producer would be used in the last step
 of processing of a Samza topology, in order to emit to Kafka some
 processed/joined/enriched data which is destined for online serving.
   *   Provide a consumer process which can be co-located on the same hosts
 as your data serving system. This process consumes from the appropriate
 partitions and checkpoints its offsets on its own. It leverages Kafka
 batching and compression to make consumption very efficient.
   *   For each records the consumer process issues a put/insert locally to
 the co-located serving process. Since this is a local operation, it is also
 very cheap and efficient.
   *   The consumer process can also optionally throttle its insertion rate
 by monitoring some performance metrics of the co-located data serving
 process. For example, if the data serving process exposes a p99 latency via
 JMX or other means, this can be used in a tight feedback loop to back off
 if read latency degrades beyond a certain threshold.
   *   This ingestion platform should be easy to integrate with any
 consistently-routed data serving system, by implementing some simple
 interfaces to let the ingestion system understand the key-to-partition
 assignment strategy, as well as the partition-to-node assignment strategy.
 Optionally, a hook to access performance metrics could also be implemented
 if throttling is deemed important (as described in the previous point).
   *   Since the consumer process lives in a separate process, the system
 benefits from good isolation guarantees. The consumer process can be capped
 to a low amount of heap, and its GC is inconsequential for the serving
 platform. It's also possible to bounce the consumer and data serving
 processes independently of each other, if need be.

 There are some more nuances and additional features which could be nice to
 have, but that's the general idea.


 It seems to me like such system would be valuable, but I'm wondering what
 other people in the open-source community think, hence why I was interested
 in starting this thread...


 Thanks for your feedback!

 -F



Re: [VOTE] Apache Samza 0.9.0 RC0

2015-03-31 Thread Roger Hoover
Nice.  Thanks Yan!

On Tue, Mar 31, 2015 at 3:24 PM, Yan Fang yanfang...@gmail.com wrote:

 Cool.

 * Published to maven, it's already there.
 * Uploaded to dist/release. It may take a while for mirrors to pick it up.
 * Updated the downloading page in
 https://issues.apache.org/jira/browse/SAMZA-624
 ** Will publish the website after mirrors pick up the 0.9.0 release
 ** In terms of the blog, seem not have the access ?

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Tue, Mar 31, 2015 at 1:36 PM, Chris Riccomini criccom...@apache.org
 wrote:

  Hey Yan/Jakob,
 
  Awesome, thanks! Yan, feel free to finish up the release. :) Very cool!
 
  Cheers,
  Chris
 
  On Tue, Mar 31, 2015 at 1:27 PM, Jakob Homan jgho...@gmail.com wrote:
 
   Correct.  All that's necessary for a release is a
   more-+1s-than--1s-from-PMC-members vote, and then we can go ahead with
   distribution, publicity, etc.
   -jg
  
   On 31 March 2015 at 12:44, Chris Riccomini criccom...@apache.org
  wrote:
Hey Yan,
   
Let's confirm with Jakob. I *think* we don't need any intervention
 from
Apache. We should be able to move forward with the release. @Jakob,
 can
   you
confirm this?
   
Cheers,
Chris
   
On Tue, Mar 31, 2015 at 11:17 AM, Yan Fang yanfang...@gmail.com
  wrote:
   
Hi all,
   
After 72+ hours, we got +4 binding votes (Chris, Jakob, Chinmay,
 Yan)
  ,
   +2
non-binding votes (Roger, Yi Pan). The release vote passes.
   
@Chris, Do we need the vote from apache general mailing list? Or I
 can
   go
ahead to update to release dist, update download page, publish 0.8.0
binaries to Maven and write 0.8.0 blog post?
   
Thanks,
Fang, Yan
yanfang...@gmail.com
   
On Tue, Mar 31, 2015 at 9:21 AM, Ash W Matheson 
  ash.mathe...@gmail.com
   
wrote:
   
Of say yes, is been a few days with little traffic on the topic.
On Mar 31, 2015 9:18 AM, Chris Riccomini criccom...@apache.org
   wrote:
   
 Hey all,

 Is the vote done?

 Cheers,
 Chris

 On Mon, Mar 30, 2015 at 2:10 PM, Chris Riccomini 
   criccom...@apache.org

 wrote:

  +1
 
  1. Validated hello-samza works with 0.9.0 Maven binaries.
  2. Validated release-0.9.0-rc0 tag exists and has correct
   checksums.
  3. Validated source release tarball builds, and has correct
   licenses
  (bin/check-all.sh).
  4. Validated source release tarball validates against Yan's PGP
   key.
  5. Ran rolling bounce of Kafka cluster with large job (1
 million+
  messages/sec)
  6. Ran Zopkio integration tests, and SAMZA-394 torture test.
 
  For (6), I ran the SAMZA-394 tests for  72 hours with torture
  test
  running. No consistency/data loss issues! I did find an issue
  with
   the
  checker integration test, but I think it's best left for
 0.10.0,
  so
I'll
  open a JIRA to track that.
 
 
  On Mon, Mar 30, 2015 at 10:49 AM, Roger Hoover 
roger.hoo...@gmail.com
  wrote:
 
  +1
 
  * Created and tested an sample job doing a join
  * Build packages
  * Couldn't get integration tests to work.  It seemed like it
 was
timing
  out
  trying to download dependencies.  I may have been having
 network
issues
  last night.
 
  Cheers,
 
  Roger
 
  On Sun, Mar 29, 2015 at 9:09 PM, Jakob Homan 
 jgho...@gmail.com
  
wrote:
 
   +1 (binding)
  
   * Verified sig and checksum
   * Spot checked files, verified license and notice
   * Built packages
   * Ran hello samza
  
   Good work, Yan.
   -jg
  
  
   On 29 March 2015 at 13:08, Chinmay Soman 
chinmay.cere...@gmail.com
   wrote:
+1
   
* Verified signature against the release
* Verified authenticity of the key
* Ran hello-samza (latest branch) - all 3 jobs succeed
  againt
0.9.0
   release
* Ran integration tests - all 3 tests pass (after
 addresing
 SAMZA-621.
Also, once a failure occurs - I have to manually kill the
  Yarn
  daemons.
   Not
sure if there's a ticket open for that - a quick search
 did
   not
 reveal
anything).
   
   
Good job guys !
   
On Sat, Mar 28, 2015 at 1:36 AM, Yan Fang 
   yanfang...@gmail.com
  wrote:
   
Hi Chris and Jakob,
   
Sure. Let's do the voting until Monday. Hope more guys
 have
time to
  try
   and
validate the 0.9.0 version.
   
Thanks,
   
Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108
   
On Fri, Mar 27, 2015 at 5:09 PM, Chris Riccomini 
  criccom...@apache.org
   
wrote:
   
 Hey Yan,

 Yea, could we delay until Monday? I have been doing a
 lot
   of
  burn-in,
   and
 have found some issues

Re: How do you serve the data computed by Samza?

2015-03-31 Thread Roger Hoover
Hi Felix,

1,3. We're experimenting with both Druid and Elasticsearch for this.  We're
using Samza to enrich user activity and system performance events then
index them in Druid +/or Elasticsearch depending on the use case.
2. These are internal BI/Operations applications
4. We're still getting up to speed on both Druid and Elastisearch to get
the necessary write throughput.  Read throughput has not been an issue.
5. Not yet but don't expect it will easy
6. Can you elaborate please?

Cheers,

Roger

On Fri, Mar 27, 2015 at 9:52 AM, Felix GV fville...@linkedin.com.invalid
wrote:

 Hi Samza devs, users and enthusiasts,

 I've kept an eye on the Samza project for a while and I think it's super
 cool! I hope it continues to mature and expand as it seems very promising (:

 One thing I've been wondering for a while is: how do people serve the data
 they computed on Samza? More specifically:

   1.  How do you expose the output of Samza jobs to online applications
 that need low-latency reads?
   2.  Are these online apps mostly internal (i.e.: analytics, dashboards,
 etc.) or public/user-facing?
   3.  What systems do you currently use (or plan to use in the short-term)
 to host the data generated in Samza? HBase? Cassandra? MySQL? Druid? Others?
   4.  Are you satisfied or are you facing challenges in terms of the write
 throughput supported by these storage/serving systems? What about read
 throughput?
   5.  Are there situations where you wish to re-process all historical
 data when making improvements to your Samza job, which results in the need
 to re-ingest all of the Samza output into your online serving system (as
 described in the Kappa Architecture
 http://radar.oreilly.com/2014/07/questioning-the-lambda-architecture.html)
 ? Is this easy breezy or painful? Do you need to throttle it lest your
 serving system will fall over?
   6.  If there was a highly-optimized and reliable way of ingesting
 partitioned streams quickly into your online serving system, would that
 help you leverage Samza more effectively?

 Your insights would be much appreciated!


 Thanks (:


 --
 Felix



Re: Re-processing a la Kappa/Liquid

2015-02-21 Thread Roger Hoover
Thanks, Jay.  This is one of the really nice advantages of local state in my 
mind.  Full retention would work but eventually run out of space, right?  
Ideally, Kafka would guarantee to keep dirty keys for a configurable amount of 
time as Chris suggested.

Sent from my iPhone

 On Feb 21, 2015, at 10:10 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Gotcha. Yes if you want to be able to join to past versions you definitely
 can't turn on compaction as the whole goal of that feature is to delete
 past versions. But wouldn't it work to use full retention if you want that
 (and use the MessageChooser interface during reprocessing if you want tight
 control over the state recreation). I mean you have the same dilemma if you
 don't use local state but instead use a remote store--the remote store
 likely only keeps the last version of each value so you can't join to the
 past.
 
 -Jay
 
 On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 Jay,
 
 Sorry, I didn't explain it very well.  I'm talking about a stream-table
 join where the table comes from a compacted topic that is used to populate
 a local data store.  As the stream events are processed, they are joined
 with dimension data from the local store.
 
 If you want to kick off another version of this job that starts back in
 time, the new job cannot reliably recreate the same state of the local
 store that the original had because old values may have been compacted
 away.
 
 Does that make sense?
 
 Roger
 
 On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Hey Roger,
 
 I'm not sure if I understand the case you are describing.
 
 As Chris says we don't yet give you fined grained control over when
 history
 starts to disappear (though we designed with the intention of making that
 configurable later). However I'm not sure if you need that for the case
 you
 describe.
 
 Say you have a job J that takes inputs I1...IN and produces output
 O1...ON
 and in the process accumulates state in a topic S. I think the approach
 is
 to launch a J' (changed or improved in some way) that reprocesses I1...IN
 from the beginning of time (or some past point) into O1'...ON' and
 accumulates state in S'. So the state for J and the state for J' are
 totally independent. J' can't reuse J's state in general because the code
 that generates that state may have changed.
 
 -Jay
 
 On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 Chris + Samza Devs,
 
 I was wondering whether Samza could support re-processing as described
 by
 the Kappa architecture or Liquid (
 http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
 
 It seems that a changelog is not sufficient to be able to restore state
 backward in time.  Kafka compaction will guarantee that local state can
 be
 restored from where it left off but I don't see how it can restore past
 state.
 
 Imagine the case where a stream job has a lot of state in it's local
 store
 but it has not updated any keys in a long time.
 
 Time t1: All of the data would be in the tail of the Kafka log (past
 the
 cleaner point).
 Time t2:  The job updates some keys.   Now we're in a state where the
 next
 compaction will blow away the old values for those keys.
 Time t3:  Compaction occurs and old values are discarded.
 
 Say we want to launch a re-processing job that would begin from t1.  If
 we
 launch that job before t3, it will correctly restore it's state.
 However,
 if we launch the job after t3, it will be missing old values, right?
 
 Unless I'm misunderstanding something, the only way around this is to
 keep
 snapshots in addition to the changelog.  Has there been any discussion
 of
 providing an option in Samza of taking RocksDB snapshots and persisting
 them to an object store or HDFS?
 
 Thanks,
 
 Roger
 


Re: Reprocessing and windowing

2015-02-23 Thread Roger Hoover
Hi Geoffry,

You might find the Google Millwheel paper and recent talk relevant.  That 
system supports windows based on event time as well as reprocessing.

Sent from my iPhone

 On Feb 23, 2015, at 4:49 PM, Geoffry Sumter vit...@gmail.com wrote:
 
 Hey everyone,
 
 I've been thinking about reprocessing
 http://samza.apache.org/learn/documentation/0.7.0/jobs/reprocessing.html 
 when
 my job has windowed state
 http://samza.apache.org/learn/documentation/0.7.0/container/state-management.html#windowed-aggregation
 and
 I have a few questions.
 
 Context: I have a series of physical sensors that stream partial scans of
 their surroundings over the course of ~5-10 minutes (at the end of 5-10
 minutes its provided a full scan of its surroundings and starts over from
 the start). Each packet of data has a timestamp that we're just going to
 have to trust is 'close enough.' When processing in real-time it's very
 natural to window the data every 5 minutes (wall clock) and merge into a
 complete view of their collective surroundings. For our purposes, old data
 arriving  5 minutes late is no longer useful for many applications.
 
 Now, I'd love to be able to reprocess data, especially by increasing
 parallelism and processing quickly, but this seems incompatible with using
 wall-clock-based windowed state. I could base my windowing/binning on the
 timestamps provided by my input data, but then I have to be careful to
 handle cases where some of my data may be arbitrarily delayed and the
 possibility that one partition will get significantly ahead of other ones
 (less interesting surroundings and faster computations) which makes waiting
 for a majority of partitions to get to a certain point in time difficult.
 
 Does anyone have experience with windowing and reprocessing? Any literature
 recommendations?
 
 Thanks!
 Geoffry


Log error deploying on YARN [Samza 0.8.0]

2015-03-24 Thread Roger Hoover
Hi all,

I'm new to YARN and trying to have YARN download the Samza job tarball (
https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html).
From the log, it seems that the download failed.  I've tested that the file
is available via curl.  The error message is:  org/apache/samza/util/Logging

I appreciate any suggestions.

Roger


2015-03-24 17:13:05,469 INFO  [Socket Reader #1 for port 33749] ipc.Server
(Server.java:saslProcess(1294)) - Auth successful for
appattempt_1427226422217_0005_02 (auth:SIMPLE)

2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
containermanager.ContainerManagerImpl
(ContainerManagerImpl.java:startContainerInternal(572)) - Start request for
container_1427226422217_0005_02_01 by user opintel

2015-03-24 17:13:05,473 INFO  [IPC Server handler 15 on 33749]
nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) - USER=opintel
IP=10.53.152.54 OPERATION=Start Container Request TARGET=ContainerManageImpl
RESULT=SUCCESS APPID=application_1427226422217_0005
CONTAINERID=container_1427226422217_0005_02_01

2015-03-24 17:13:05,473 INFO  [AsyncDispatcher event handler]
application.Application (ApplicationImpl.java:transition(296)) - Adding
container_1427226422217_0005_02_01 to application
application_1427226422217_0005

2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
container.Container (ContainerImpl.java:handle(884)) - Container
container_1427226422217_0005_02_01 transitioned from NEW to LOCALIZING

2015-03-24 17:13:05,474 INFO  [AsyncDispatcher event handler]
containermanager.AuxServices (AuxServices.java:handle(175)) - Got event
CONTAINER_INIT for appId application_1427226422217_0005

2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource
http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from
INIT to DOWNLOADING

2015-03-24 17:13:05,475 INFO  [AsyncDispatcher event handler]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:handle(596)) - Created localizer for
container_1427226422217_0005_02_01

2015-03-24 17:13:05,480 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:writeCredentials(1029)) - Writing
credentials to the nmPrivate file
/tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens.
Credentials list:

2015-03-24 17:13:05,481 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing
user opintel

2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:startLocalizer(103)) - Copying from
/tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens
to
/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens

2015-03-24 17:13:05,492 INFO  [LocalizerRunner for
container_1427226422217_0005_02_01]
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to
/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005
=
file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005

2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
localizer.ResourceLocalizationService
(ResourceLocalizationService.java:update(928)) - DEBUG: FAILED { http://
somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null },
org/apache/samza/util/Logging

2015-03-24 17:13:05,520 INFO  [IPC Server handler 4 on 8040]
localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource
http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from
DOWNLOADING to FAILED

2015-03-24 17:13:05,520 INFO  [AsyncDispatcher event handler]
container.Container (ContainerImpl.java:handle(884)) - Container
container_1427226422217_0005_02_01 transitioned from LOCALIZING to
LOCALIZATION_FAILED

2015-03-24 17:13:05,521 INFO  [AsyncDispatcher event handler]
localizer.LocalResourcesTrackerImpl
(LocalResourcesTrackerImpl.java:handle(137)) - Container
container_1427226422217_0005_02_01 sent RELEASE event on a resource
request { http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0,
ARCHIVE, null } not present in cache.

2015-03-24 17:13:05,521 WARN  [LocalizerRunner for
container_1427226422217_0005_02_01] ipc.Client (Client.java:call(1388))
- interrupted waiting to send rpc request to server

java.lang.InterruptedException

at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400)

at java.util.concurrent.FutureTask.get(FutureTask.java:187)

at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1029)

at 

Re: Kafka topic naming conventions

2015-03-19 Thread Roger Hoover
Renato,

Thanks for the link.  Some interesting suggests there as well.

On Thu, Mar 19, 2015 at 2:28 AM, Renato Marroquín Mogrovejo 
renatoj.marroq...@gmail.com wrote:

 There was an interesting discussion over in the kafka mailing list that
 might give you more ideas Roger.
 Although they don't mention anything about the number of partitions when
 doing so, anyways maybe it helps.


 Renato M.

 [1] https://www.mail-archive.com/users@kafka.apache.org/msg11976.html

 2015-03-19 5:43 GMT+01:00 Roger Hoover roger.hoo...@gmail.com:

  Thanks, guys.  I was also playing around with including partition count
 and
  even the partition key in the topic name.   My thought was that topics
 may
  have the same data and number of partitions but only differ by partition
  key.  After a while, the naming does get crazy (too long and ugly).  We
  really need a topic metatdata store.
 
  On Wed, Mar 18, 2015 at 6:21 PM, Chinmay Soman 
 chinmay.cere...@gmail.com
  wrote:
 
   Yeah ! It does seem a bit hackish - but I think this approach promises
  less
   config/operation errors.
  
   Although I think some of these checks can be built within Samza -
  assuming
   Kafka has a metadata store in the near future - the Samza container can
   validate the #topics against this store.
  
   On Wed, Mar 18, 2015 at 6:16 PM, Chris Riccomini 
 criccom...@apache.org
   wrote:
  
Hey Chinmay,
   
Cool, this is good feedback. I didn't think I was *that* crazy. :)
   
Cheers,
Chris
   
On Wed, Mar 18, 2015 at 6:10 PM, Chinmay Soman 
   chinmay.cere...@gmail.com
wrote:
   
 Thats what we're doing as well - appending partition count to the
  kafka
 topic name. This actually helps keep track of the #partitions for
  each
 topic (since Kafka doesn't have a Metadata store yet).

 In case of topic expansion - we actually just resort to creating a
  new
 topic. Although that is an overhead - the thought process is that
  this
will
 minimize operational errors. Also, this is necessary to do in case
   we're
 doing some kind of joins.


 On Wed, Mar 18, 2015 at 5:59 PM, Jakob Homan jgho...@gmail.com
   wrote:

  On 18 March 2015 at 17:48, Chris Riccomini 
 criccom...@apache.org
 wrote:
   One thing I haven't seen, but might be relevant, is including
partition
   counts in the topic.
 
  Yeah, but then if you change the partition count later on, you've
  got
  incorrect information forever. Or you need to create a new
 stream,
  which might be a nice forcing function to make sure your join
 isn't
  screwed up.  There'd need to be something somewhere to enforce
 that
  though.
 



 --
 Thanks and regards

 Chinmay Soman

   
  
  
  
   --
   Thanks and regards
  
   Chinmay Soman
  
 



Re: How do you serve the data computed by Samza?

2015-04-02 Thread Roger Hoover
Is it because the Kafka partitioning might not be the same as the storage
partitioning?  So that a slow storage shard will prevent unrelated shards
from getting their messages?

Ah, I think I see what you mean.  If so, then the solution is to make the
Kafka partitioning match the storage partitioning.  If that case, push or
pull is the same, yeah?

Thanks,

Roger

On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover roger.hoo...@gmail.com wrote:

 Chinmay,

 Thanks for your input.

 I'm not understanding what the difference is.  With the design that Felix
 laid out, the co-located Kafka consumer is still doing a push to the
 storage system, right?.  It just happens to be on the same machine.  How is
 this different from pushing batches from a non-local Samza job?   How does
 the pull-based approach you're thinking of deal with feedback and SLAs?

 Thanks,

 Roger



 On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman chinmay.cere...@gmail.com
 wrote:

 My 2 cents = One thing to note about the push model : multi-tenancy

 When your storage system (Druid for example) is used in a multi-tenant
 fashion - then push model is a bit difficult to operate. Primarily because
 there is no real feedback loop from the storage system. Yes - if the
 storage system starts doing bad - then you get timeouts and higher
 latencies - but then you're already in a position where you're probably
 breaking SLAs (for some tenant).

 In that sense, a pull model might be better since the consumer can
 potentially have more visibility into how this particular node is doing.
 Also, with the Kafka consumer batches things up - so theoretically - you
 could get similar throughput. Downside of this approach is of course - the
 storage system partitioning scheme *has to* line up with the Kafka
 partitioning scheme.

 On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Felix,
 
  I see your point about simple Kafka consumers.  My thought was that if
  you're already managing a Samza/YARN deployment then these types of jobs
  would be just another job and not require an additional process
  management/monitoring/operations setup.  If you've already got a way to
  handle vanilla Kafka jobs then it makes sense.
 
  For the push model, the way we're planning to deal with the latency of
  round-trip calls is to batch up pushs to the downstream system.  Both
 Druid
  Tranquility and the ES transport node protocol allow you to batch index
  requests.  I'm curious if pull would be that much more efficient.
 
  Cheers,
 
  Roger
 
  On Wed, Apr 1, 2015 at 10:26 AM, Felix GV
 fville...@linkedin.com.invalid
  wrote:
 
   Hi Roger,
  
   You bring up good points, and I think the short answer is that there
 are
   trade-offs to everything, of course (:
  
   What I described could definitely be implemented as a Samza job, and I
   think that would make a lot of sense if the data serving system was
 also
   deployed via YARN. This way, the Samza tasks responsible for ingesting
  and
   populating the data serving system's nodes could be spawned wherever
 YARN
   knows these nodes are located. For data serving systems not well
  integrated
   with YARN however, I'm not sure that there would be that much win in
  using
   the Samza deployment model. And since the consumers themselves are
 pretty
   simple (no joining of streams, no local state, etc.), this seems to
 be a
   case where Samza is a bit overkill and a regular Kafka consumer is
   perfectly fine (except for the YARN-enabled auto-deployment aspect,
 like
  I
   mentioned).
  
   As for push versus pull, I think the trade-off is the following: push
 is
   mostly simpler and more decoupled, as you said, but I think pull
 would be
   more efficient. The reason for that is that Kafka consumption is very
   efficient (thanks to batching and compression), but most data serving
   systems don't provide a streaming ingest API for pushing data
 efficiently
   to them, instead they have single record put/insert APIs which
 require a
   round-trip to be acknowledged. This is perfectly fine in
 low-throughput
   scenarios, but does not support very high throughput of ingestion like
   Kafka can provide. By co-locating the pulling process (i.e.: Kafka
   consumer) with the data serving node, it makes it a bit more
 affordable
  to
   do single puts since the (local) round-trip acks would be
   near-instantaneous. Pulling also makes the tracking of offsets across
   different nodes a bit easier, since each node can consume at its own
  pace,
   and resume at whatever point in the past it needs (i.e.: rewind)
 without
   affecting the other replicas. Tracking offsets across many replicas in
  the
   push model is a bit more annoying, though still doable, of course.
  
   --
  
   Felix GV
   Data Infrastructure Engineer
   Distributed Data Systems
   LinkedIn
  
   f...@linkedin.com
   linkedin.com/in/felixgv
  
   
   From: Roger Hoover [roger.hoo

Re: Joining Avro records

2015-04-13 Thread Roger Hoover
Hi all,

In case this helps anyone, I was able to create a simple class to do the
join and it works nicely for my use case.  It assumes you have schema for
the input and output records.

Example (
https://github.com/Quantiply/rico/blob/master/avro-serde/src/test/java/com/quantiply/avro/JoinTest.java#L44-L52
):

GenericRecord in1 = getIn1();
GenericRecord in2 = getIn2();

GenericRecord joined = new Join(getJoinedSchema())
.merge(in1)
.merge(in2)
.getBuilder()
.set(charlie, blah blah)
.build();

Class is here:
https://github.com/Quantiply/rico/blob/master/avro-serde/src/main/java/com/quantiply/avro/Join.java

Cheers,

Roger

On Thu, Apr 9, 2015 at 12:54 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Yi Pan,

 Thanks for your response.  I'm thinking that I'll iterate over the fields
 of the input schemas (similar to this
 https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62),
 match them up with the output schema and then copy the values.  It'll let
 you know how it goes in case it's useful.

 Cheers,

 Roger

 On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Roger,

 Good question on that. I am actually not aware of any automatic way of
 doing this in Avro. I have tried to add generic Schema and Data interface
 in samza-sql branch to address the morphing of the schemas from input
 streams to the output streams. The basic idea is to have wrapper Schema
 and
 Data classes on-top-of the deserialized objects to access the data fields
 according to the schema w/o changing and copying the actual data fields.
 Hence, when there is a need to morph the input data schemas into a new
 output data schema, we just need an implementation of the new output data
 Schema class that can read the corresponding data fields from the input
 data and write them out in the output schema. An interface function
 transform() is added in the Schema class for this exact purpose.
 Currently,
 it only takes one input data and one example of projection
 transformation
 can be found in the implementation of AvroSchema class. A join case as you
 presented may well be a reason to have an implementation of join with
 multiple input data.

 All the above solution is still experimental and please feel free to
 provide your feedback and comments on that. If we agree that this solution
 is good and suit for a broader use case, it can be considered to be used
 outside the SQL context as well.

 Best regards!

 -Yi

 On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Hi Milinda and others,
 
  This is an Avro question but since you guys are working on Avro support
 for
  stream SQL, I thought I'd ask you for help.
 
  If I have a two records of type A and B as below and want to join them
  similar to SELECT * in SQL to produce a record of type AB, is there an
  simple way to do this with Avro without writing code to copy each field
  individually?
 
  I appreciate any help.
 
  Thanks,
 
  Roger
 
  {
name: A,
type: record,
namespace: fubar,
fields: [{name: a, type : int}]
  }
 
  {
name: B,
type: record,
namespace: fubar,
fields: [{name: b, type : int}]
  }
 
  {
name: AB,
type: record,
namespace: fubar,
fields: [{name: a, type : int}, {name: b, type :
 int}]
  }
 





Re: Errors and hung job on broker shutdown

2015-04-28 Thread Roger Hoover
At error level logging, this was the only entry in the Samza log:

2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395]
Unable to send message from TaskName-Partition 1 to system kafka

Here is the log from the Kafka broker that was shutdown.

http://pastebin.com/afgmLyNF

Thanks,

Roger


On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan nickpa...@gmail.com wrote:

 Roger, could you paste the full log from Samza container? If you can figure
 out which Kafka broker the message was sent to, it would be helpful if we
 get the log from the broker as well.

 On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Hi,
 
  I need some help figuring out what's going on.
 
  I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics have
  replication factor of 2.
 
  I'm bouncing the Kafka broker using SIGTERM (with
  controlled.shutdown.enable=true).  I see the Samza job log this message
 and
  then hang (does not exit nor does it make any progress).
 
  2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
  ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
  TaskName-Partition 1 to system kafka
 
  The Kafka consumer (Druid Real-Time node) on the other side then barfs on
  the message:
 
  Exception in thread chief-svc-perf
 kafka.message.InvalidMessageException:
  Message is corrupt (stored crc = 1792882425, computed crc = 3898271689)
  at kafka.message.Message.ensureValid(Message.scala:166)
  at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
  at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
  at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
  at
 
 
 io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
  at
 
 
 io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
 
  My questions are:
  1) What is the right way to bounce a Kafka broker?
  2) Is this a bug in Samza that the job hangs after producer request
 fails?
  Has anyone seen this?
 
  Thanks,
 
  Roger
 



Errors and hung job on broker shutdown

2015-04-28 Thread Roger Hoover
Hi,

I need some help figuring out what's going on.

I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics have
replication factor of 2.

I'm bouncing the Kafka broker using SIGTERM (with
controlled.shutdown.enable=true).  I see the Samza job log this message and
then hang (does not exit nor does it make any progress).

2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
TaskName-Partition 1 to system kafka

The Kafka consumer (Druid Real-Time node) on the other side then barfs on
the message:

Exception in thread chief-svc-perf kafka.message.InvalidMessageException:
Message is corrupt (stored crc = 1792882425, computed crc = 3898271689)
at kafka.message.Message.ensureValid(Message.scala:166)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at
io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
at
io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)

My questions are:
1) What is the right way to bounce a Kafka broker?
2) Is this a bug in Samza that the job hangs after producer request fails?
Has anyone seen this?

Thanks,

Roger


Re: Errors and hung job on broker shutdown

2015-04-30 Thread Roger Hoover
Guozhang and Yan,

Thank you both for your responses.  I tried a lot of combinations and I
think I've determined that it's new producer + snappy that causes the issue.

It never happens with the old producer and it never happens with lz4 or no
compression.  It only happens when a broker gets restarted (or maybe just
shutdown).

The error is not always the same.  I've noticed at least three types of
errors on the Kafka brokers.

1) java.io.IOException: failed to read chunk
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356)
http://pastebin.com/NZrrEHxU
2) java.lang.OutOfMemoryError: Java heap space
   at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346)
http://pastebin.com/yuxk1BjY
3) java.io.IOException: PARSING_ERROR(2)
  at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
http://pastebin.com/yq98Hx49

I've noticed a couple different behaviors from the Samza producer/job
A) It goes into a long retry loop where this message is logged.  I saw this
with error #1 above.

2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[253] Got
error produce response with correlation id 4878 on topic-partition
svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts
left). Error: CORRUPT_MESSAGE

B) The job exists with
org.apache.kafka.common.errors.UnknownServerException (at least when run as
ThreadJob).  I saw this with error #3 above.

org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 6 to system kafka.
org.apache.kafka.common.errors.UnknownServerException: The server
experienced an unexpected error when processing the request

This seems most likely to be a bug in the new Kafka producer.  I'll
probably file a JIRA for that project.

Thanks,

Roger

On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang wangg...@gmail.com wrote:

 And just to answer your first question: SIGTERM with
 controlled.shutdown=true should be OK for bouncing the broker.

 Guozhang

 On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang wangg...@gmail.com wrote:

  Roger,
 
  I believe Samza 0.9.0 already uses the Java producer.
 
  Java producer's close() call will try to flush all buffered data to the
  brokers before completing the call. However, if some buffered data's
  destination partition leader is not known, the producer will block on
  refreshing the metadata and then retry sending.
 
  From the broker logs, it seems it does receive the producer request but
  failed to handle it due to Leader not local after the bounce:
 
  
  [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
  correlation id 226 from client
  samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
  [sys.samza_metrics,0] failed due to Leader not local for partition
  [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
  [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
  correlation id 45671 from client
  samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
  partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed
  due to Leader not local for partition
  [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
  (kafka.server.KafkaApis)
  [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
  correlation id 12267 from client
  samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
  [sys.samza_metrics,0] failed due to Leader not local for partition
  [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
  
 
  because for these two topic-partitions (sys.samza_metrics,0 and
  __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has
 been
  moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets
 the
  error code from the old leader, it should refresh its metadata and get
 the
  new leader as broker-1, and retry sending, but for some reason it does
 not
  refresh its metadata. Without producer logs from Samza container I cannot
  further investigate the issue.
 
  Which Kafka version does Samza 0.9.0 use?
 
  Guozhang
 
  On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang yanfang...@gmail.com wrote:
 
  Not sure about the Kafka side. From the Samza side, from your
  description ( does
  not exit nor does it make any progress ), I think the code is stuck in
  producer.close
  
 
 https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
  ,
  otherwise, it will throw SamzaException to quit the job. So maybe some
  Kafka experts in this mailing list or Kafka mailing list can help
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover roger.hoo...@gmail.com
  wrote:
 
   At error level logging, this was the only entry in the Samza log:
  
   2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
   ssp[kafka

Re: How to configure the Resource Manager endpoint for YARN?

2015-04-15 Thread Roger Hoover
I'll try that.  Thanks, Chris.

On Wed, Apr 15, 2015 at 9:37 AM, Chris Riccomini criccom...@apache.org
wrote:

 Hey Roger,

 Not sure if this makes a difference, but have you tried using:

   export YARN_CONF_DIR=...

 Instead? This is what we use.

 Cheers,
 Chris

 On Wed, Apr 15, 2015 at 9:33 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Hi,
 
  I'm trying to deploy a job to a small YARN cluster.  How do tell the
  launcher script where to find the Resource Manager?  I tried creating a
  yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it
  doesn't find my config.
 
  2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM
  0.0.0.0:8032
  2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at /
  0.0.0.0:8032
 
  Thanks,
 
  Roger
 



How to configure the Resource Manager endpoint for YARN?

2015-04-15 Thread Roger Hoover
Hi,

I'm trying to deploy a job to a small YARN cluster.  How do tell the
launcher script where to find the Resource Manager?  I tried creating a
yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it
doesn't find my config.

2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM 0.0.0.0:8032
2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at /
0.0.0.0:8032

Thanks,

Roger


Re: Joining Avro records

2015-04-09 Thread Roger Hoover
Thanks, Julian.  Good point about needing aliasing for unique names in
SQL.  I didn't know about array_agg...nice.

On Thu, Apr 9, 2015 at 12:35 PM, Julian Hyde jul...@hydromatic.net wrote:

 Much of this is about mapping from logical fields (i.e. the fields you can
 reference in SQL) down to the Avro representation; I’m no expert on that
 mapping, so I’ll focus on the SQL stuff.

 First, SQL doesn’t allow a record to have two fields of the same name, so
 you wouldn’t be allowed to have two “name” fields. When you do a join, you
 might need to alias output columns:

 select stream orders.id, products.id as productId
 from orders
 join products on orders.id = products.id;

 Second, JOIN isn’t the only SQL operator that combines records; GROUP BY
 also combines records. JOIN combines records from different streams, and
 they usually have different types (i.e. different numbers/types of fields),
 whereas GROUP BY combines records from the same stream. Use whichever best
 suits your purpose.

 select stream zipcode, floor(rowtime to hour), array_agg(orderid) as
 orderIds
 from orders
 group by zipcode, floor(rowtime to hour)

 (array_agg is an aggregate function, recently added to the SQL standard,
 that gathers input values into an array. See
 http://www.craigkerstiens.com/2013/04/17/array-agg/.)

 Output:

 { zipcode: “94705”, rowtime: “2015-04-09 11:00:00”, orderIds: [123, 156,
 1056] },
 { zipcode: “94117”, rowtime: “2015-04-09 11:00:00”, orderIds: [45, 777] },
 { zipcode: “94705”, rowtime: “2015-04-09 12:00:00”, orderIds: [55] }

 Julian


 On Apr 9, 2015, at 12:07 PM, Yi Pan nickpa...@gmail.com wrote:

  Hi, Roger,
 
  Good question on that. I am actually not aware of any automatic way of
  doing this in Avro. I have tried to add generic Schema and Data interface
  in samza-sql branch to address the morphing of the schemas from input
  streams to the output streams. The basic idea is to have wrapper Schema
 and
  Data classes on-top-of the deserialized objects to access the data fields
  according to the schema w/o changing and copying the actual data fields.
  Hence, when there is a need to morph the input data schemas into a new
  output data schema, we just need an implementation of the new output data
  Schema class that can read the corresponding data fields from the input
  data and write them out in the output schema. An interface function
  transform() is added in the Schema class for this exact purpose.
 Currently,
  it only takes one input data and one example of projection
 transformation
  can be found in the implementation of AvroSchema class. A join case as
 you
  presented may well be a reason to have an implementation of join with
  multiple input data.
 
  All the above solution is still experimental and please feel free to
  provide your feedback and comments on that. If we agree that this
 solution
  is good and suit for a broader use case, it can be considered to be used
  outside the SQL context as well.
 
  Best regards!
 
  -Yi
 
  On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
  Hi Milinda and others,
 
  This is an Avro question but since you guys are working on Avro support
 for
  stream SQL, I thought I'd ask you for help.
 
  If I have a two records of type A and B as below and want to join them
  similar to SELECT * in SQL to produce a record of type AB, is there an
  simple way to do this with Avro without writing code to copy each field
  individually?
 
  I appreciate any help.
 
  Thanks,
 
  Roger
 
  {
   name: A,
   type: record,
   namespace: fubar,
   fields: [{name: a, type : int}]
  }
 
  {
   name: B,
   type: record,
   namespace: fubar,
   fields: [{name: b, type : int}]
  }
 
  {
   name: AB,
   type: record,
   namespace: fubar,
   fields: [{name: a, type : int}, {name: b, type :
 int}]
  }
 




Re: Joining Avro records

2015-04-09 Thread Roger Hoover
Yi Pan,

Thanks for your response.  I'm thinking that I'll iterate over the fields
of the input schemas (similar to this
https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62),
match them up with the output schema and then copy the values.  It'll let
you know how it goes in case it's useful.

Cheers,

Roger

On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Roger,

 Good question on that. I am actually not aware of any automatic way of
 doing this in Avro. I have tried to add generic Schema and Data interface
 in samza-sql branch to address the morphing of the schemas from input
 streams to the output streams. The basic idea is to have wrapper Schema and
 Data classes on-top-of the deserialized objects to access the data fields
 according to the schema w/o changing and copying the actual data fields.
 Hence, when there is a need to morph the input data schemas into a new
 output data schema, we just need an implementation of the new output data
 Schema class that can read the corresponding data fields from the input
 data and write them out in the output schema. An interface function
 transform() is added in the Schema class for this exact purpose. Currently,
 it only takes one input data and one example of projection transformation
 can be found in the implementation of AvroSchema class. A join case as you
 presented may well be a reason to have an implementation of join with
 multiple input data.

 All the above solution is still experimental and please feel free to
 provide your feedback and comments on that. If we agree that this solution
 is good and suit for a broader use case, it can be considered to be used
 outside the SQL context as well.

 Best regards!

 -Yi

 On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Hi Milinda and others,
 
  This is an Avro question but since you guys are working on Avro support
 for
  stream SQL, I thought I'd ask you for help.
 
  If I have a two records of type A and B as below and want to join them
  similar to SELECT * in SQL to produce a record of type AB, is there an
  simple way to do this with Avro without writing code to copy each field
  individually?
 
  I appreciate any help.
 
  Thanks,
 
  Roger
 
  {
name: A,
type: record,
namespace: fubar,
fields: [{name: a, type : int}]
  }
 
  {
name: B,
type: record,
namespace: fubar,
fields: [{name: b, type : int}]
  }
 
  {
name: AB,
type: record,
namespace: fubar,
fields: [{name: a, type : int}, {name: b, type :
 int}]
  }
 



Joining Avro records

2015-04-09 Thread Roger Hoover
Hi Milinda and others,

This is an Avro question but since you guys are working on Avro support for
stream SQL, I thought I'd ask you for help.

If I have a two records of type A and B as below and want to join them
similar to SELECT * in SQL to produce a record of type AB, is there an
simple way to do this with Avro without writing code to copy each field
individually?

I appreciate any help.

Thanks,

Roger

{
  name: A,
  type: record,
  namespace: fubar,
  fields: [{name: a, type : int}]
}

{
  name: B,
  type: record,
  namespace: fubar,
  fields: [{name: b, type : int}]
}

{
  name: AB,
  type: record,
  namespace: fubar,
  fields: [{name: a, type : int}, {name: b, type : int}]
}


Re: Newbie questions after completing Hello Samza about performance and project setup

2015-04-09 Thread Roger Hoover
Hi Warren,

Yes, I think Hello Samza is the template project to work from.  I believe
that the slow message rate that you are seeing is because it's subscribed
to the the wikipedia IRC stream which may only generate a few events per
second.

That said, some of the example configuration for the hello samza demo is
not tuned for performance.

In general, enabling compression can help a lot for jobs that are I/O
bound.  Enabling lz4 on JSON data, for example, shrinks it 10x.

On the consumer side, setting  task.consumer.batch.size might help.

On the producer side, you might want to play around with these settings.

systems.kafka.producer.compression.type
systems.kafka.producer.batch.size
systems.kafka.producer.linger.ms

http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html
http://kafka.apache.org/documentation.html#newproducerconfigs

Cheers,

Roger

On Thu, Apr 9, 2015 at 1:14 AM, Warren Henning warren.henn...@gmail.com
wrote:

 Hi,

 I ran the commands in http://samza.apache.org/startup/hello-samza/0.9/
 successfully. Fascinating stuff!

 I was running all the processes on my (fairly recent model) Macbook Pro.
 One aspect I've heard about Kafka and Samza is performance -- handling
 thousands of messages a second. E.g.,

 http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
 talks about doing millions of writes a second. The rate at which the
 console emitted new messages seemed like a rate far slower than that --
 maybe something on the order of 1-2 a second. I ran the commands and
 everything exactly as is listed on the tutorial page.

 Of course a laptop is vastly different from a production setup -- what kind
 of assumptions can you make about performance of Samza jobs in development
 mode? I realize it depends on what you're doing -- it's just very different
 from what I was expecting.

 Also, I'm not really sure about the best way to get started with writing my
 own Samza jobs. Is there a project template to work off of? Is the Hello
 Samza project it? Maybe import the Maven POM into a favorite IDE and rip
 out the Wikipedia-related classes? As someone who has written Java before
 but doesn't write it every day, it wasn't immediately clear to me.

 Apologies if these are addressed in blog posts/FAQs/documentation and I
 failed to research them adequately.

 Thanks!

 Warren



Re: How to configure log4j separately for the AM versus containers?

2015-06-23 Thread Roger Hoover
Ah, this seems to work.  I saw the YarnJob.scala was referencing __package
to launch to AM itself.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC
-Dlog4j.configuration=file://$(pwd)/__package/lib/log4j-am.xml

On Tue, Jun 23, 2015 at 12:40 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Hi,

 I want the App Master to log at INFO level and the container to log at
 ERROR.  Is there a way to configure the AM to use a different log4j config
 file?

 I'm trying to setting yarn.am.opts but ran couldn't get it to work with
 system properties.

 yarn.am.opts=-Xmx768m -XX:+UseSerialGC
 -Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml

 give bad substitution error.

 This actually works it feels too ugly.

 yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd
 | cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print)

 What's the right way to do it?

 Thanks,

 Roger



How to configure log4j separately for the AM versus containers?

2015-06-23 Thread Roger Hoover
Hi,

I want the App Master to log at INFO level and the container to log at
ERROR.  Is there a way to configure the AM to use a different log4j config
file?

I'm trying to setting yarn.am.opts but ran couldn't get it to work with
system properties.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC
-Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml

give bad substitution error.

This actually works it feels too ugly.

yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd |
cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print)

What's the right way to do it?

Thanks,

Roger


Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
Hi Yan,

I've uploaded a file with TRACE level logging here:
http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz

I really appreciate your help as this is a critical issue for me.

Thanks,

Roger

On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Roger,

  but it only spawns one container and still hangs after bootstrap
 -- this probably is due to your local machine does not have enough
 resource for the second container. Because I checked your log file, each
 container is about 4GB.

 When I run it on our YARN cluster with a single container, it works
 correctly.  When I tried it with 5 containers, it gets hung after consuming
 the bootstrap topic.
-- Have you figure it out? I have a looked at your log and also the
 code. My suspect is that, there is a null enveloper somehow blocking the
 process. If you can paste the trace level log, it will be more helpful
 because many logs in chooser are trace level.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  I need some help.  I have a job which bootstraps one stream and then is
  supposed to read from two.  When I run it on our YARN cluster with a
 single
  container, it works correctly.  When I tried it with 5 containers, it
 gets
  hung after consuming the bootstrap topic.  I ran it with the grid script
 on
  my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
  container and still hangs after bootstrap.
 
  Debug logs are here: http://pastebin.com/af3KPvju
 
  I looked at JMX metrics and see:
  - Task Metrics - no value for kafka offset of non-bootstrapped stream
  -  SystemConsumerMetrics
  - choose null keeps incrementing
   - ssps-needed-by-chooser 1
- unprocessed-messages 62k
  - Bootstrapping Chooser
- lagging partitions 4
- laggin-batch-streams - 4
- batch-resets - 0
 
  Has anyone seen this or can offer ideas of how to better debug it?
 
  I'm using Samza 0.9.0 and YARN 2.4.0.
 
  Thanks!
 
  Roger
 



Re: Samza hung after bootstrapping

2015-06-21 Thread Roger Hoover
I think I see what's happening.

When there are 8 tasks and I set yarn.container.count=8, then each
container is responsible for a single task.  However, the
systemStreamLagCounts map (
https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77)
and laggingSystemStreamPartitions (
https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L83)
are configured to track all partitions for the bootstrap topic rather than
just the one partition assigned to this task.

Later in the log, we see that the task/container completed bootstrap for
it's own partition.

2015-06-21 12:28:55 org.apache.samza.system.chooser.BootstrappingChooser
[DEBUG] Bootstrap stream partition is fully caught up:
SystemStreamPartition [kafka, deploy.svc.tlrnsZOYQA6wrwAA4FLqZA, 0]

but the Bootstrapping Chooser still thinks that the remaining partitions
(assigned to other tasks in other containers) need to be completed.  JMX at
this point shows 7 lagging partitions of the 8 original partition count.

I'm wondering why no one has run into this.  Doesn't LinkedIn use
partitioned bootstrapped topics?

Thanks,

Roger

On Sun, Jun 21, 2015 at 12:22 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Hi Yan,

 I've uploaded a file with TRACE level logging here:
 http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz

 I really appreciate your help as this is a critical issue for me.

 Thanks,

 Roger

 On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Roger,

  but it only spawns one container and still hangs after bootstrap
 -- this probably is due to your local machine does not have enough
 resource for the second container. Because I checked your log file, each
 container is about 4GB.

 When I run it on our YARN cluster with a single container, it works
 correctly.  When I tried it with 5 containers, it gets hung after
 consuming
 the bootstrap topic.
-- Have you figure it out? I have a looked at your log and also the
 code. My suspect is that, there is a null enveloper somehow blocking the
 process. If you can paste the trace level log, it will be more helpful
 because many logs in chooser are trace level.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  I need some help.  I have a job which bootstraps one stream and then is
  supposed to read from two.  When I run it on our YARN cluster with a
 single
  container, it works correctly.  When I tried it with 5 containers, it
 gets
  hung after consuming the bootstrap topic.  I ran it with the grid
 script on
  my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
  container and still hangs after bootstrap.
 
  Debug logs are here: http://pastebin.com/af3KPvju
 
  I looked at JMX metrics and see:
  - Task Metrics - no value for kafka offset of non-bootstrapped stream
  -  SystemConsumerMetrics
  - choose null keeps incrementing
   - ssps-needed-by-chooser 1
- unprocessed-messages 62k
  - Bootstrapping Chooser
- lagging partitions 4
- laggin-batch-streams - 4
- batch-resets - 0
 
  Has anyone seen this or can offer ideas of how to better debug it?
 
  I'm using Samza 0.9.0 and YARN 2.4.0.
 
  Thanks!
 
  Roger
 





Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-21 Thread Roger Hoover
Hi all,

Do you think we could get this bootstrapping bug fixed before 0.9.1
release?  It seems like a critical bug.

https://issues.apache.org/jira/browse/SAMZA-720

Thanks,

Roger

On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang yanfang...@gmail.com wrote:

 Agree. I will test it this weekend.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang wangg...@gmail.com wrote:

  Since we only get one vote so far, I think I have to extend the vote
  deadline. Let's set it to next Monday 6pm.
 
  Please check the candidate and vote for your opinions.
 
  Guozhang
 
  On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan nickpa...@gmail.com wrote:
 
   +1. Ran the Samza failure test suite and succeeded over night.
  
   On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang wangg...@gmail.com
  wrote:
  
Hey all,
   
This is a call for a vote on a release of Apache Samza 0.9.1. This
 is a
bug-fix release against 0.9.0.
   
The release candidate can be downloaded from here:
   
http://people.apache.org/~guozhang/samza-0.9.1-rc0/
   
The release candidate is signed with pgp key 911402D8, which is
included in the repository's KEYS file:
   
   
   
  
 
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
   
and can also be found on keyservers:
   
http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8
   
The git tag is release-0.9.1-rc0 and signed with the same pgp key:
   
   
   
  
 
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
   
Test binaries have been published to Maven's staging repository, and
  are
available here:
   
5 critical bugs were resolved for this release:
   
   
   
  
 
 https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
   
The vote will be open for 72 hours ( end in 6:00pm Saturday,
 06/20/2015
   ).
Please download the release candidate, check the hashes/signature,
  build
   it
and test it, and then please vote:
   
[ ] +1 approve
[ ] +0 no opinion
[ ] -1 disapprove (and reason why)
   
-- Guozhang
   
  
 
 
 
  --
  -- Guozhang
 



Re: Samza hung after bootstrapping

2015-06-20 Thread Roger Hoover
Thank you, Yan.  I'll get a trace level log as soon as I can.

Sent from my iPhone

 On Jun 19, 2015, at 12:05 PM, Yan Fang yanfang...@gmail.com wrote:
 
 Hi Roger,
 
  but it only spawns one container and still hangs after bootstrap
-- this probably is due to your local machine does not have enough
 resource for the second container. Because I checked your log file, each
 container is about 4GB.
 
 When I run it on our YARN cluster with a single container, it works
 correctly.  When I tried it with 5 containers, it gets hung after consuming
 the bootstrap topic.
   -- Have you figure it out? I have a looked at your log and also the
 code. My suspect is that, there is a null enveloper somehow blocking the
 process. If you can paste the trace level log, it will be more helpful
 because many logs in chooser are trace level.
 
 Thanks,
 
 Fang, Yan
 yanfang...@gmail.com
 
 On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 I need some help.  I have a job which bootstraps one stream and then is
 supposed to read from two.  When I run it on our YARN cluster with a single
 container, it works correctly.  When I tried it with 5 containers, it gets
 hung after consuming the bootstrap topic.  I ran it with the grid script on
 my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
 container and still hangs after bootstrap.
 
 Debug logs are here: http://pastebin.com/af3KPvju
 
 I looked at JMX metrics and see:
 - Task Metrics - no value for kafka offset of non-bootstrapped stream
 -  SystemConsumerMetrics
- choose null keeps incrementing
 - ssps-needed-by-chooser 1
  - unprocessed-messages 62k
 - Bootstrapping Chooser
  - lagging partitions 4
  - laggin-batch-streams - 4
  - batch-resets - 0
 
 Has anyone seen this or can offer ideas of how to better debug it?
 
 I'm using Samza 0.9.0 and YARN 2.4.0.
 
 Thanks!
 
 Roger
 


Samza hung after bootstrapping

2015-06-18 Thread Roger Hoover
I need some help.  I have a job which bootstraps one stream and then is
supposed to read from two.  When I run it on our YARN cluster with a single
container, it works correctly.  When I tried it with 5 containers, it gets
hung after consuming the bootstrap topic.  I ran it with the grid script on
my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
container and still hangs after bootstrap.

Debug logs are here: http://pastebin.com/af3KPvju

I looked at JMX metrics and see:
- Task Metrics - no value for kafka offset of non-bootstrapped stream
-  SystemConsumerMetrics
- choose null keeps incrementing
 - ssps-needed-by-chooser 1
  - unprocessed-messages 62k
- Bootstrapping Chooser
  - lagging partitions 4
  - laggin-batch-streams - 4
  - batch-resets - 0

Has anyone seen this or can offer ideas of how to better debug it?

I'm using Samza 0.9.0 and YARN 2.4.0.

Thanks!

Roger


Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-22 Thread Roger Hoover
Yan,

I tested to patch locally and it looks good.  Creating a patched release for 
myself to test in our environment.  Thanks, again.

Sent from my iPhone

 On Jun 22, 2015, at 10:59 AM, Yi Pan nickpa...@gmail.com wrote:
 
 Hi, Yan,
 
 Thanks a lot for the quick fix on the mentioned bugs. It seems the fix for
 SAMZA-720 is pretty localized and I am OK to push it into 0.9.1. I will be
 working on back porting those changes to 0.9.1 later today and fix all the
 release related issues.
 
 Thanks!
 
 -Yi
 
 On Mon, Jun 22, 2015 at 10:30 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 Yan,
 
 You rock.  Thank you so much for the quick fix.  I'm working on building
 and testing the patch.
 
 Cheers,
 
 Roger
 
 On Mon, Jun 22, 2015 at 1:09 AM, Yan Fang yanfang...@gmail.com wrote:
 
 Hi guys,
 
 1. I have the difficulty in building the 0.9.1 branch. I think this is
 mainly related to SAMZA-721
 https://issues.apache.org/jira/browse/SAMZA-721.
 
 2. Also, https://issues.apache.org/jira/browse/SAMZA-712 seems bothering
 people as well.
 
 3. https://issues.apache.org/jira/browse/SAMZA-720 is a critical bug we
 need to fix. Have already attached a patch.
 
 4. There is no maven staging link.
 
 Thanks,
 
 Fang, Yan
 yanfang...@gmail.com
 
 On Sun, Jun 21, 2015 at 1:53 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 Hi all,
 
 Do you think we could get this bootstrapping bug fixed before 0.9.1
 release?  It seems like a critical bug.
 
 https://issues.apache.org/jira/browse/SAMZA-720
 
 Thanks,
 
 Roger
 
 On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang yanfang...@gmail.com
 wrote:
 
 Agree. I will test it this weekend.
 
 Thanks,
 
 Fang, Yan
 yanfang...@gmail.com
 
 On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
 Since we only get one vote so far, I think I have to extend the
 vote
 deadline. Let's set it to next Monday 6pm.
 
 Please check the candidate and vote for your opinions.
 
 Guozhang
 
 On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan nickpa...@gmail.com
 wrote:
 
 +1. Ran the Samza failure test suite and succeeded over night.
 
 On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang 
 wangg...@gmail.com
 
 wrote:
 
 Hey all,
 
 This is a call for a vote on a release of Apache Samza 0.9.1.
 This
 is a
 bug-fix release against 0.9.0.
 
 The release candidate can be downloaded from here:
 
 http://people.apache.org/~guozhang/samza-0.9.1-rc0/
 
 The release candidate is signed with pgp key 911402D8, which is
 included in the repository's KEYS file:
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
 
 and can also be found on keyservers:
 
 http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8
 
 The git tag is release-0.9.1-rc0 and signed with the same pgp
 key:
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
 
 Test binaries have been published to Maven's staging
 repository,
 and
 are
 available here:
 
 5 critical bugs were resolved for this release:
 https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
 
 The vote will be open for 72 hours ( end in 6:00pm Saturday,
 06/20/2015
 ).
 Please download the release candidate, check the
 hashes/signature,
 build
 it
 and test it, and then please vote:
 
 [ ] +1 approve
 [ ] +0 no opinion
 [ ] -1 disapprove (and reason why)
 
 -- Guozhang
 
 
 
 --
 -- Guozhang
 


Re: Samza job throughput much lower than Kafka throughput

2015-05-21 Thread Roger Hoover
Oops.  Sent too soon.  I mean:

producer.batch.size=262144
producer.linger.ms=5
producer.compression.type=lz4


On Thu, May 21, 2015 at 9:00 AM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Hi George,

 You might also try tweaking the producer settings.

 producer.batch.size=262144
 producer.linger.ms=5
 producer.compression.type: lz4

 On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi George,

 Is there any reason you need to set the following configs?

 systems.kafka.consumer.fetch.wait.max.ms= 1

 This setting will basically disable long pooling of the consumer which
 will
 then busy fetching data from broker, which has a large impact on network
 latency especially when the consumer is already caught up with the Kafka
 broker.

 Also when you say it is slower than a program reading directly from
 Kafka. which consumer did your program use to read data from Kafka?

 Guozhang


 On Wed, May 20, 2015 at 5:01 PM, George Li g...@ca.ibm.com wrote:

  Hi Yi,
 
  Thanks for the reply. Below is my job config and code.
 
  When we run this job inside our dev docker container, which has
 zookeeper,
  broker, and yarn installed locally,  its throughput is at least 50%
 higher
  than our cluster run's.
 
  Thanks,
 
  George
 
  Configuration:
 
  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
  job.name=container-performance
 
  # YARN
  yarn.container.count=1
  yarn.container.memory.mb=2548
  yarn.package.path={my package on hdfs}
  yarn.container.retry.count=0
  yarn.am.container.memory.mb=2048
  yarn.am.jmx.enabled=false
 
  # Task
  task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
  -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
  -XX:+DisableExplicitGC -Djava.awt.headless=true
 
  task.class=samza.TestPerformanceTask
  task.inputs=kafka.throughput-test2
  task.log.interval=100
  task.checkpoint.factory =
  org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
  task.checkpoint.system=kafka
  task.checkpoint.replication.factor=1
 
  # Kafka System (only used for coordinator stream in this test)
 
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
  systems.kafka.samza.fetch.threshold=5
 
  systems.kafka.consumer.zookeeper.connect= {zookeeper}
  systems.kafka.producer.bootstrap.servers={broker node}
  systems.kafka.consumer.auto.offset.reset=smallest
  systems.kafka.consumer.socket.receive.buffer.bytes= 220
  systems.kafka.consumer.fetch.message.max.bytes= 110
  systems.kafka.consumer.fetch.min.bytes= 1
  systems.kafka.consumer.fetch.wait.max.ms= 1
 
  #define coordinator system
  job.coordinator.system=kafka
  job.coordinator.replication.factor=1
 
  systems.kafka.streams.throughput-test2.samza.reset.offset=true
  systems.kafka.streams.throughput-test2.samza.offset.default=oldest
  ~
 
  Job's code. This is mostly a copy-paste of the one in the repository
 
  object TestPerformanceTask {
// No thread safety is needed for these variables because they're
  mutated in
//   // the process method, which is single threaded.
var messagesProcessed = 0
var startTime = 0L
  }
 
  class TestPerformanceTask extends StreamTask with InitableTask with
  Logging {
import TestPerformanceTask._
 
/**
 ** How many messages to process before a log message is printed.
 *   */
var logInterval = 1
 
/**
 ** How many messages to process before shutting down.
 *   */
var maxMessages = 1000
 
 
var outputSystemStream: Option[SystemStream] = None
 
def init(config: Config, context: TaskContext) {
  logInterval = config.getInt(task.log.interval, 1)
  maxMessages = config.getInt(task.max.messages, 1000)
  outputSystemStream = Option(config.get(task.outputs,
  null)).map(Util.getSystemStreamFromNames(_))
  println(init!!)
}
 
def process(envelope: IncomingMessageEnvelope, collector:
  MessageCollector, coordinator: TaskCoordinator) {
  if (startTime == 0) {
startTime = System.currentTimeMillis
  }
 
  if (outputSystemStream.isDefined) {
collector.send(new OutgoingMessageEnvelope(outputSystemStream.get,
  envelope.getKey, envelope.getMessage))
  }
 
  messagesProcessed += 1
 
  if (messagesProcessed % logInterval == 0) {
val seconds = (System.currentTimeMillis - startTime) / 1000
println(Processed %s messages in %s seconds. format
  (messagesProcessed, seconds))
  }
 
 
  if (messagesProcessed = maxMessages) {
coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
  }
}
  }
 
 
 
  From:   Yi Pan nickpa...@gmail.com
  To: dev@samza.apache.org,
  Date:   20/05/2015 05:03 PM
  Subject:Re: Samza job throughput much lower than Kafka
 throughput
 
 
 
  Hi, George,
 
  Could you share w/ us the code and configuration of your sample test
 job?
  Thanks!
 
  -Yi
 
  On Wed, May 20, 2015 at 1:19 PM

Re: Thoughts and obesrvations on Samza

2015-07-07 Thread Roger Hoover
Metamorphosis...nice. :)

This has been a great discussion.  As a user of Samza who's recently
integrated it into a relatively large organization, I just want to add
support to a few points already made.

The biggest hurdles to adoption of Samza as it currently exists that I've
experienced are:
1) YARN - YARN is overly complex in many environments where Puppet would do
just fine but it was the only mechanism to get fault tolerance.
2) Configuration - I think I like the idea of configuring most of the job
in code rather than config files.  In general, I think the goal should be
to make it harder to make mistakes, especially of the kind where the code
expects something and the config doesn't match.  The current config is
quite intricate and error-prone.  For example, the application logic may
depend on bootstrapping a topic but rather than asserting that in the code,
you have to rely on getting the config right.  Likewise with serdes, the
Java representations produced by various serdes (JSON, Avro, etc.) are not
equivalent so you cannot just reconfigure a serde without changing the
code.   It would be nice for jobs to be able to assert what they expect
from their input topics in terms of partitioning.  This is getting a little
off topic but I was even thinking about creating a Samza config linter
that would sanity check a set of configs.  Especially in organizations
where config is managed by a different team than the application developer,
it's very hard to get avoid config mistakes.
3) Java/Scala centric - for many teams (especially DevOps-type folks), the
pain of the Java toolchain (maven, slow builds, weak command line support,
configuration over convention) really inhibits productivity.  As more and
more high-quality clients become available for Kafka, I hope they'll follow
Samza's model.  Not sure how much it affects the proposals in this thread
but please consider other languages in the ecosystem as well.  From what
I've heard, Spark has more Python users than Java/Scala.
(FYI, we added a Jython wrapper for the Samza API
https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza
and are working on a Yeoman generator
https://github.com/Quantiply/generator-rico for Jython/Samza projects to
alleviate some of the pain)

I also want to underscore Jay's point about improving the user experience.
That's a very important factor for adoption.  I think the goal should be to
make Samza as easy to get started with as something like Logstash.
Logstash is vastly inferior in terms of capabilities to Samza but it's easy
to get started and that makes a big difference.

Cheers,

Roger





On Tue, Jul 7, 2015 at 3:29 AM, Gianmarco De Francisci Morales 
g...@apache.org wrote:

 Forgot to add. On the naming issues, Kafka Metamorphosis is a clear winner
 :)

 --
 Gianmarco

 On 7 July 2015 at 13:26, Gianmarco De Francisci Morales g...@apache.org
 wrote:

  Hi,
 
  @Martin, thanks for you comments.
  Maybe I'm missing some important point, but I think coupling the releases
  is actually a *good* thing.
  To make an example, would it be better if the MR and HDFS components of
  Hadoop had different release schedules?
 
  Actually, keeping the discussion in a single place would make agreeing on
  releases (and backwards compatibility) much easier, as everybody would be
  responsible for the whole codebase.
 
  That said, I like the idea of absorbing samza-core as a sub-project, and
  leave the fancy stuff separate.
  It probably gives 90% of the benefits we have been discussing here.
 
  Cheers,
 
  --
  Gianmarco
 
  On 7 July 2015 at 02:30, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Martin,
 
  I agree coupling release schedules is a downside.
 
  Definitely we can try to solve some of the integration problems in
  Confluent Platform or in other distributions. But I think this ends up
  being really shallow. I guess I feel to really get a good user
 experience
  the two systems have to kind of feel like part of the same thing and you
  can't really add that in later--you can put both in the same
 downloadable
  tar file but it doesn't really give a very cohesive feeling. I agree
 that
  ultimately any of the project stuff is as much social and naming as
  anything else--theoretically two totally independent projects could work
  to
  tightly align. In practice this seems to be quite difficult though.
 
  For the frameworks--totally agree it would be good to maintain the
  framework support with the project. In some cases there may not be too
  much
  there since the integration gets lighter but I think whatever stubs you
  need should be included. So no I definitely wasn't trying to imply
  dropping
  support for these frameworks, just making the integration lighter by
  separating process management from partition management.
 
  You raise two good points we would have to figure out if we went down
 the
  alignment path:
  1. With respect to the name, yeah I think the first question is 

Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-19 Thread Roger Hoover
We're using 2.4.0 in production.  Are there any major incompatibilities to
watch out for when upgrading to 2.6.0?

Thanks,

Roger

On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi guys,

 we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
 https://issues.apache.org/jira/browse/SAMZA-536), because there are some
 bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature
 in Yarn 2.4.0 (SAMZA-750 https://issues.apache.org/jira/browse/SAMZA-750
 )
 .

 So we just want to make sure if any production users are still using Yarn
 2.4.0 and do not plan to upgrade to 2.6.0+?

 If not further concern, I think we can go and upgrade to Yarn 2.6.0 in
 Samza 0.10.0 release.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com



Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-24 Thread Roger Hoover
Works for me.

Sent from my iPhone

 On Aug 24, 2015, at 7:48 AM, Yan Fang yanfang...@gmail.com wrote:
 
 Hi Roger,
 
 If you have plan to upgrade to 2.6.0, and no other companies are using
 2.4.0, I think we can upgrade to 2.6.0 yarn in 0.10.0.
 
 Thanks,
 
 Fang, Yan
 yanfang...@gmail.com
 
 On Thu, Aug 20, 2015 at 4:48 PM, Yi Pan nickpa...@gmail.com wrote:
 
 Hi, Selina,
 
 Samza 0.9.1 on YARN 2.6 is the proved working solution.
 
 Best,
 
 -Yi
 
 On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech swucaree...@gmail.com
 wrote:
 
 Hi, Yi:
 If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed?
 
 Sincerely,
 Selina
 
 On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan nickpa...@gmail.com wrote:
 
 Hi, Roger,
 
 In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
 now. I am not aware of any major issues in upgrading. I will let our
 team
 member Jon Bringhurst to chime in since he did all the upgrade and may
 have
 more insights.
 
 @Jon, could you help to comment on this?
 
 Thanks!
 
 -Yi
 
 On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 We're using 2.4.0 in production.  Are there any major
 incompatibilities
 to
 watch out for when upgrading to 2.6.0?
 
 Thanks,
 
 Roger
 
 On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang yanfang...@gmail.com
 wrote:
 
 Hi guys,
 
 we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
 https://issues.apache.org/jira/browse/SAMZA-536), because there
 are
 some
 bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
 feature
 in Yarn 2.4.0 (SAMZA-750 
 https://issues.apache.org/jira/browse/SAMZA-750
 )
 .
 
 So we just want to make sure if any production users are still
 using
 Yarn
 2.4.0 and do not plan to upgrade to 2.6.0+?
 
 If not further concern, I think we can go and upgrade to Yarn 2.6.0
 in
 Samza 0.10.0 release.
 
 Thanks,
 
 Fang, Yan
 yanfang...@gmail.com
 
 
 
 
 


Re: Use one producer for both coordinator stream and users system?

2015-08-18 Thread Roger Hoover
Hi Yan,

My (uneducated) guess is that the performance gains come from batching.  I
don't know if the new producer ever batches by destination broker.  If not
and it only batches by (broker,topic,partition) then I doubt that one vs
two producers will affect performance as they send to different topics.

Cheers,

Roger

On Tue, Aug 18, 2015 at 12:26 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Tao,

 First, one kafka producer has an i/o thread. (correct me if I am wrong).

 Second, after Samza 0.10.0, we have a coordinator stream, which stores the
 checkpoint, config and other locality information for auto-scaling, dynamic
 configuration, etc purpose. (See Samza-348
 https://issues.apache.org/jira/browse/SAMZA-348). So we have a producer
 for this coordinator stream.

 Therefore, each contains will have at least two producers, one is for the
 coordinator stream, one is for the users system.

 My question is, can we use only one producer for both coordinator stream
 and the users system to have better performance? (from the doc, it may
 retrieve better performance.)

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Mon, Aug 17, 2015 at 9:49 PM, Tao Feng fengta...@gmail.com wrote:

  Hi Yan,
 
  Naive question: what do we need producer thread of coordinator stream
 for?
 
  Thanks,
  -Tao
 
  On Mon, Aug 17, 2015 at 2:09 PM, Yan Fang yanfang...@gmail.com wrote:
 
   Hi guys,
  
   I have this question because Kafka's doc
   
  
 
 http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
   
   seems recommending having one producer shared by all threads (*The
   producer is thread safe and should generally be shared among all
 threads
   for best performance.*), while currently the coordinator stream is
  using a
   separate producer (usually, there are two producers(two producer
 threads)
   in each container: one is for the coordinator stream , one is for the
   real job)
  
   1. Will having one producer shared by all thread really improve the
   performance? (haven't done the perf test myself. Guess Kafka has some
   proof).
  
   2. if yes, should we go this way?
  
   Thanks,
  
   Fang, Yan
   yanfang...@gmail.com
  
 



Re: changelog compaction problem

2015-07-29 Thread Roger Hoover
You also may want to check if the cleaner thread in the broker is still
alive (using jstack).  I've run into this issue and used the fix mentioned
in the ticket to get compaction working again.

https://issues.apache.org/jira/browse/KAFKA-1641
I'd just like to mention that a possible workaround (depending on your
situation in regard to keys) is to stop the broker, remove the cleaner
offset checkpoint, and then start the broker again for each ISR member in
serial to get the thread running again. Keep in mind that the cleaner will
start from the beginning if you do this.

On Wed, Jul 29, 2015 at 8:43 AM, Chinmay Soman chinmay.cere...@gmail.com
wrote:

 Just curious,

 Can you double check if you have log compaction enabled on your Kafka
 brokers ?

 On Wed, Jul 29, 2015 at 8:30 AM, Vladimir Lebedev w...@fastmail.fm wrote:

  Hello,
 
  I have a problem with changelog in one of my samza jobs grows
 indefinitely.
 
  The job is quite simple, it reads messages from the input kafka topic,
 and
  either creates or updates a key in task-local samza store. Once in a
 minute
  the window method kicks-in, it iterates over all keys in the store and
  deletes some of them, selecting on the contents of their value.
 
  Message rate in input topic is about 3000 messages per second. The input
  topic is partitioned in 48 partitions. Average number of keys, kept in
 the
  store is more or less stable and do not exceed 1 keys per task.
 Average
  size of values is 50 bytes. So I expected that sum of all segments' size
 in
  kafka data directory for the job's changelog topic should not exceed
  1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB (after 6 days
  running from scratch) and it is growing.
 
  I tried to change default segment size for changelog topic in kafka, and
  it worked a bit - instead of 500Mbyte segments I have now 50Mbyte
 segments,
  but it did not heal the indefinite data growth problem.
 
  Moreover, if I stop the job and start it again it cannot restart, it
  breaks right after reading all records from changelog topic.
 
  Did somebody have similar problem? How it could be resolved?
 
  Best regards,
  Vladimir
 
  --
  Vladimir Lebedev
  w...@fastmail.fm
 
 


 --
 Thanks and regards

 Chinmay Soman



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


 On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 116
  https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116
 
  Should we add a Samza specifc message, then add the whole exception? so 
  it's more clear what the exception was from if the user doesn't know the 
  code? `Logger.info(Failed to index message in ElasticSearch., e);`
  
  This would also be true for other log lines added.

Good idea.  Thanks.

BTW, it didn't work like this: Logger.info(Failed to index message in 
ElasticSearch., itemResp.getFailure()) so I did this:

LOGGER.error(Failed to index document in Elasticsearch:  + 
itemResp.getFailureMessage());


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36815/
 ---
 
 (Updated July 29, 2015, 6:22 a.m.)
 
 
 Review request for samza and Dan Harvey.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-741 Add support for versioning to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  f61bd36 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  e3b635b 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
  afe0eee 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  980964f 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  684d7f6 
 
 Diff: https://reviews.apache.org/r/36815/diff/
 
 
 Testing
 ---
 
 Refactored DefaultIndexRequestFactory to make it easier to subclass and 
 customize to handle version and version_type parameters.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thanks, Yi!

On Wed, Jul 29, 2015 at 12:16 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Roger,

 I am testing the patch now. Will update the JIRA soon.

 Thanks!

 -Yi

 On Wed, Jul 29, 2015 at 12:11 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
  committers please take a look?
 
  On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey danharve...@gmail.com
  wrote:
 
  This is an automatically generated e-mail. To reply, visit:
   https://reviews.apache.org/r/36815/
  
   On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
  
  
  
 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
   
 
 https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116
 
  (Diff
   revision 6)
  
   public void register(final String source) {
  
  116
  
 LOGGER.info(itemResp.getFailureMessage());
  
 Should we add a Samza specifc message, then add the whole exception?
  so it's more clear what the exception was from if the user doesn't know
 the
  code? Logger.info(Failed to index message in ElasticSearch., e);
  
   This would also be true for other log lines added.
  
On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
  
   Good idea.  Thanks.
  
   BTW, it didn't work like this: Logger.info(Failed to index message in
  ElasticSearch., itemResp.getFailure()) so I did this:
  
   LOGGER.error(Failed to index document in Elasticsearch:  +
  itemResp.getFailureMessage());
  
On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
  
   This is what the messages look like
  
   2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index
  document in Elasticsearch:
  VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]:
  version conflict, current [9], provided [5]]
  
That looks fine!
  
  
   - Dan
  
   On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
 Review request for samza and Dan Harvey.
   By Roger Hoover.
  
   *Updated July 29, 2015, 6:24 p.m.*
*Repository: * samza
   Description
  
   SAMZA-741 Add support for versioning to Elasticsearch System Producer
  
 Testing
  
   Refactored DefaultIndexRequestFactory to make it easier to subclass and
  customize to handle version and version_type parameters.
  
 Diffs
  
  -
 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  (f61bd36)
  -
 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  (e3b635b)
  -
 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
  (afe0eee)
  -
 
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  (980964f)
  -
 
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  (684d7f6)
  
   View Diff https://reviews.apache.org/r/36815/diff/
  
 



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
committers please take a look?

On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey danharve...@gmail.com wrote:

This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36815/

 On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:


 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116 
 (Diff
 revision 6)

 public void register(final String source) {

116

   LOGGER.info(itemResp.getFailureMessage());

   Should we add a Samza specifc message, then add the whole exception? so 
 it's more clear what the exception was from if the user doesn't know the 
 code? Logger.info(Failed to index message in ElasticSearch., e);

 This would also be true for other log lines added.

  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:

 Good idea.  Thanks.

 BTW, it didn't work like this: Logger.info(Failed to index message in 
 ElasticSearch., itemResp.getFailure()) so I did this:

 LOGGER.error(Failed to index document in Elasticsearch:  + 
 itemResp.getFailureMessage());

  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:

 This is what the messages look like

 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
 document in Elasticsearch: 
 VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
 version conflict, current [9], provided [5]]

  That looks fine!


 - Dan

 On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
   Review request for samza and Dan Harvey.
 By Roger Hoover.

 *Updated July 29, 2015, 6:24 p.m.*
  *Repository: * samza
 Description

 SAMZA-741 Add support for versioning to Elasticsearch System Producer

   Testing

 Refactored DefaultIndexRequestFactory to make it easier to subclass and 
 customize to handle version and version_type parameters.

   Diffs

- 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
(f61bd36)
- 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
(e3b635b)
- 
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
(afe0eee)
- 
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
(980964f)
- 
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
(684d7f6)

 View Diff https://reviews.apache.org/r/36815/diff/



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 115
  https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line115
 
  could switch these around so you've got 
  getStatus().equals(RestStatus.CONFLICT), fewer nots.

Ok. I reworked it to not have nots.  :)


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 117
  https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line117
 
  I don't think the LOGGER check here is needed?

Yeah, not necessary here.


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 118
  https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line118
 
  I think it's worth adding a metrics here too so we know how many 
  conflicts occur. Then does the log message from elastic search fit in with 
  the Sazma ones, and is easy to understand?

Added them in the updateSuccessMetrics() method


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 124
  https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line124
 
  it is odd a method with the name updateSuccessMetrics returns the 
  number of writes? could just leave the log line as it was?
  
  or it could compute the different in the metrics before and after 
  calling updateSuccessMetrics() here? might make more sense?

Leaving it as is seemed misleading since version conflicts are not successfully 
written.  Before/after comparison seems a little messy so I moved the log line 
to the updateSuccessMetrics() method


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java,
   line 92
  https://reviews.apache.org/r/36815/diff/3/?file=1023402#file1023402line92
 
  I know I set this line before but reading the elasticsearch docs they 
  recommend using  `Requests.indexRequest(index).type(type)`

Sounds good.


 On July 28, 2015, 7:36 a.m., Dan Harvey wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 139
  https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line139
 
  This already gets logged and thrown in flush(), is this to see the 
  exception sooner?

These are the individual exceptions per document.  They're not being logged in 
the flush() method.  Only batch-level errors are saved and logged in the flush. 
 I wasn't seeing any of the document level errors in the log.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93249
---


On July 28, 2015, 6:15 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36815/
 ---
 
 (Updated July 28, 2015, 6:15 a.m.)
 
 
 Review request for samza and Dan Harvey.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-741 Add support for versioning to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  f61bd36 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  e3b635b 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
  afe0eee 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  980964f 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  684d7f6 
 
 Diff: https://reviews.apache.org/r/36815/diff/
 
 
 Testing
 ---
 
 Refactored DefaultIndexRequestFactory to make it easier to subclass and 
 customize to handle version and version_type parameters.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


 On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 149
  https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149
 
  Quick question: Is it guaranteed that there is no DeleteResponse here? 
  It would be good to at least log a warn if we get an unexpected response 
  here.
 
 Roger Hoover wrote:
 It is guaranteed that you will not get a DeleteResponse back because the 
 producer currently only allows IndexRequests.  In the furture, if it supports 
 DeleteRequest then we should add a counter metric for deletes.
 
 Yi Pan (Data Infrastructure) wrote:
 @Roger, thanks for the explanation. My point is: it would be good to make 
 the code detect and log unexpected response.

Oh, I see.  I just added a warning log to do that.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93395
---


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36815/
 ---
 
 (Updated July 29, 2015, 6:22 a.m.)
 
 
 Review request for samza and Dan Harvey.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-741 Add support for versioning to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  f61bd36 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  e3b635b 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
  afe0eee 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  980964f 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  684d7f6 
 
 Diff: https://reviews.apache.org/r/36815/diff/
 
 
 Testing
 ---
 
 Refactored DefaultIndexRequestFactory to make it easier to subclass and 
 customize to handle version and version_type parameters.
 
 
 Thanks,
 
 Roger Hoover
 




Re: [DISCUSS] Release 0.10.0

2015-07-28 Thread Roger Hoover
Thanks, Yi.

I propose that we also include SAMZA-741 for Elasticsearch versioning
support with the new ES producer.  I think it's very close to being merged.

Roger


On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, all,

 I want to start the discussion on the release schedule for 0.10.0. There
 are a few important features that we plan to release in 0.10.0 and I want
 to start this thread s.t. we can agree on what to include in 0.10.0
 release.

 There are the following main features added in 0.10.0:
 - RocksDB TTL support
 - Add CoordinatorStream and disable CheckpointManager
 - Elasticsearch Producer
 - Host affinity
 And other 0.10.0 tickets:

 https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0

 I propose to cut a 0.10.0 release after we get the following issues
 resolved:
 - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator stream
 - SAMZA-617: YARN host affinity in Samza

 Thoughts?

 Thanks!

 -Yi



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover


 On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 149
  https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149
 
  Quick question: Is it guaranteed that there is no DeleteResponse here? 
  It would be good to at least log a warn if we get an unexpected response 
  here.

It is guaranteed that you will not get a DeleteResponse back because the 
producer currently only allows IndexRequests.  In the furture, if it supports 
DeleteRequest then we should add a counter metric for deletes.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93395
---


On July 29, 2015, 5:17 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36815/
 ---
 
 (Updated July 29, 2015, 5:17 a.m.)
 
 
 Review request for samza and Dan Harvey.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-741 Add support for versioning to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  f61bd36 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  e3b635b 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
  afe0eee 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  980964f 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  684d7f6 
 
 Diff: https://reviews.apache.org/r/36815/diff/
 
 
 Testing
 ---
 
 Refactored DefaultIndexRequestFactory to make it easier to subclass and 
 customize to handle version and version_type parameters.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 6:22 a.m.)


Review request for samza and Dan Harvey.


Changes
---

Making the invalid response type an error level log message


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-28 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 28, 2015, 6:13 a.m.)


Review request for samza.


Changes
---

Ignoring version conflicts


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Kafka broker error from samza producer

2015-07-23 Thread Roger Hoover
I forgot to mention that for me the error always happened after restarting a 
broker.

Sent from my iPhone

 On Jul 23, 2015, at 4:25 PM, Jordan Shaw jor...@pubnub.com wrote:
 
 Hey Roger,
 I restarted the producer and the error went away on the broker. If it comes
 back I'll switch over to lz4. Thanks for the reply.
 -Jordan
 
 On Thu, Jul 23, 2015 at 9:32 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:
 
 Hi Jordan,
 
 I ran into a similiar issue when using snappy compression and the new
 producer.   If you disable compression or switch to lz4 or gzip, does the
 issue go away?
 
 Cheers,
 
 Roger
 
 On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw jor...@pubnub.com wrote:
 
 Hey Everyone,
 I'm getting an:
 kafka.message.InvalidMessageException: Message found with corrupt size
 (0)
 
 in my kafka server.log here is the full stack trace:
 https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1.
 
 It indicates that the error is caused by a bad produce call from the
 samza
 producer any idea what could be causing this? Just about the only thing
 that I can find is maybe a issue with snappy or compression but I don't
 see
 a snappy call in the traceback.
 
 --
 Jordan Shaw
 Full Stack Software Engineer
 PubNub Inc
 
 
 
 -- 
 Jordan Shaw
 Full Stack Software Engineer
 PubNub Inc
 1045 17th St
 San Francisco, CA 94107


Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-22 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 22, 2015, 5 p.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 22, 2015, 4:07 a.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Roger Hoover


 On July 21, 2015, 5:42 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, 
  lines 36-37
  https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36
 
  though it works, prefer to use the def here, not only because it has 
  leff overhead, but also keep all the methods consistent for better 
  readability. What do you think?
 
 Roger Hoover wrote:
 Sounds good.  I only baulked on it the first time because I'm not that 
 skilled with Scala type decarations yet. :)  I can make this work

I take it back.  It seems it [can't be 
done](http://www.scala-lang.org/old/node/5082)


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---


On July 21, 2015, 5:41 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 21, 2015, 5:41 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
 8eac8ef 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  e63d62c 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-25 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 25, 2015, 4:48 p.m.)


Review request for samza.


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Kafka broker error from samza producer

2015-07-23 Thread Roger Hoover
Hi Jordan,

I ran into a similiar issue when using snappy compression and the new
producer.   If you disable compression or switch to lz4 or gzip, does the
issue go away?

Cheers,

Roger

On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw jor...@pubnub.com wrote:

 Hey Everyone,
 I'm getting an:
  kafka.message.InvalidMessageException: Message found with corrupt size
 (0)

 in my kafka server.log here is the full stack trace:
 https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1.

 It indicates that the error is caused by a bad produce call from the samza
 producer any idea what could be causing this? Just about the only thing
 that I can find is maybe a issue with snappy or compression but I don't see
 a snappy call in the traceback.

 --
 Jordan Shaw
 Full Stack Software Engineer
 PubNub Inc



How to map document version to the Elasticsearch System Producer?

2015-07-23 Thread Roger Hoover
Hi Dan and Samza devs,

I have a use case for which I need to set an external version on
Elasticsearch documents.  Versioning (
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning)
lets you prevent duplicate messages from temporarily overwriting new
versions of a document with old ones.

Currently, the Elasticsearch system producer does not support setting
versions.  Since Kafka/Samza don't have support for key/value headers in
messages, I think the best approach is to embed metadata into the stream
name.

We can add a version and version_type as options to the stream name.  These
match up with Elasticsearch REST API (
https://www.elastic.co/blog/elasticsearch-versioning-support)

{index-name}/{type-name}?version={version-id}version_type={version-type}

I've created a JIRA (https://issues.apache.org/jira/browse/SAMZA-741).  I'd
appreciate your feedback.

Thanks,

Roger


Metrics for Elasticsearch System Producer

2015-07-14 Thread Roger Hoover
Hi all,

I've started using the new Elasticsearch System Producer (many thanks,
Dan!) and decided to add some metrics to it.

The JIRA ticket and review request links are here:

https://issues.apache.org/jira/browse/SAMZA-733

https://reviews.apache.org/r/36473/

Cheers,

Roger


Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

Review request for samza.


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


 On July 14, 2015, 9:44 p.m., Yan Fang wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
   line 24
  https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24
 
  can this class extends MetricsHelper? This can simplifies a little.

I don't see how it simplifies things because I have to implement all the 
methods in the Scala trait.  I'm having trouble getting the newGauge signatures 
to match.

```
public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
public final Counter bulkSendSuccess;
public final Counter inserts;
public final Counter updates;
private final MetricsRegistry registry;
private final String group;
private final String systemName;

public interface JFunctionR {
R apply();
}

public ElasticsearchSystemProducerMetrics(String systemName, 
MetricsRegistry registry) {
group = this.getClass().getName();
this.registry = registry;
this.systemName = systemName;

bulkSendSuccess = newCounter(bulk-send-success);
inserts = newCounter(docs-inserted);
updates = newCounter(docs-updated);
}

@Override
public Counter newCounter(String name) {
return MetricsHelper$class.newCounter(this, name);
}

@Override
public T GaugeT newGauge(String name, T value) {
return MetricsHelper$class.newGauge(this, name, value);
}

@Override
public T GaugeT newGauge(String name, JFunctionT value) {
return null;
}

@Override
public Timer newTimer(String name) {
return MetricsHelper$class.newTimer(this, name);
}

@Override
public String getPrefix() {
return systemName + -;
}

@Override
public MetricsRegistry registry() {
return registry;
}

@Override
public String group() {
return group;
}
}
```


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


 On July 14, 2015, 9:44 p.m., Yan Fang wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
   line 152
  https://reviews.apache.org/r/36473/diff/1/?file=1010787#file1010787line152
 
  remove the space

Sure thing.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover


 On July 14, 2015, 9:44 p.m., Yan Fang wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
   line 24
  https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24
 
  can this class extends MetricsHelper? This can simplifies a little.
 
 Roger Hoover wrote:
 I don't see how it simplifies things because I have to implement all the 
 methods in the Scala trait.  I'm having trouble getting the newGauge 
 signatures to match.
 
 ```
 public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
 public final Counter bulkSendSuccess;
 public final Counter inserts;
 public final Counter updates;
 private final MetricsRegistry registry;
 private final String group;
 private final String systemName;
 
 public interface JFunctionR {
 R apply();
 }
 
 public ElasticsearchSystemProducerMetrics(String systemName, 
 MetricsRegistry registry) {
 group = this.getClass().getName();
 this.registry = registry;
 this.systemName = systemName;
 
 bulkSendSuccess = newCounter(bulk-send-success);
 inserts = newCounter(docs-inserted);
 updates = newCounter(docs-updated);
 }
 
 @Override
 public Counter newCounter(String name) {
 return MetricsHelper$class.newCounter(this, name);
 }
 
 @Override
 public T GaugeT newGauge(String name, T value) {
 return MetricsHelper$class.newGauge(this, name, value);
 }
 
 @Override
 public T GaugeT newGauge(String name, JFunctionT value) {
 return null;
 }
 
 @Override
 public Timer newTimer(String name) {
 return MetricsHelper$class.newTimer(this, name);
 }
 
 @Override
 public String getPrefix() {
 return systemName + -;
 }
 
 @Override
 public MetricsRegistry registry() {
 return registry;
 }
 
 @Override
 public String group() {
 return group;
 }
 }
 ```

We really only need counters for this class but have to figure out how to 
implement the Scala newGauge methods which are tricky.  Would appreciate help 
if you know how to do it.


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-20 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 21, 2015, 5:41 a.m.)


Review request for samza.


Changes
---

Fixed tests and added base class for Java metrics


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
8eac8ef 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 PRE-CREATION 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 e63d62c 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/
---

(Updated July 15, 2015, 4:45 a.m.)


Review request for samza.


Changes
---

Removed extra space


Repository: samza


Description
---

SAMZA-733 Add metrics to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 a277b69 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 7eb14a2 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/36473/diff/


Testing
---

Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
stream and that the metrics correctly count how many Elasticsearch documents 
were created and indexed.


Thanks,

Roger Hoover



Snapshot metrics stop getting scheduled in an exception occurs

2015-10-26 Thread Roger Hoover
Hi Samza devs,

I ran into an issue with Samza 0.9.1 where I had a serialization exception
thrown in the MetricsSnapshotReporter.  It's very hard to find because
nothing is logged and the metrics just stop getting scheduled.  Samza
should catch all exceptions in that thread, log them, and suppress them
rather than no longer scheduling snapshot metrics at all.

JIRA filed here:
https://issues.apache.org/jira/browse/SAMZA-801

Cheers,

Roger


Re: Checkpoint tool not working

2015-10-30 Thread Roger Hoover
I tried it once with 0.9.1 and it didn't work for me either.  I didn't have
time to examine it more carefully at the time.

Roger

On Thu, Oct 29, 2015 at 10:05 PM, Lukas Steiblys 
wrote:

> I'm using Samza 0.9.1.
>
> Lukas
>
> On 10/29/15, Yi Pan  wrote:
> > Hi, Lukas,
> >
> > Which version of checkpoint-tool are you using?
> >
> > -Yi
> >
> > On Thu, Oct 29, 2015 at 5:39 PM, Lukas Steiblys 
> > wrote:
> >
> >> Hello,
> >>
> >> I’m trying to write the checkpoints for a Samza task supplying these
> >> arguments to the checkpoint tool:
> >>
> >> bin/checkpoint-tool.sh
> >> --new-offsets=file:///checkpoints/client-metrics.properties
> >> --config-path=file:///checkpoints/task.properties
> >>
> >> However, it doesn’t actually write the checkpoints and, instead, prints
> >> out the current checkpoints.
> >>
> >> Is the tool currently broken? A sample of the client-metrics.properties
> >> file:
> >>
> >> tasknames.Partition 13.systems.kafka.streams.Sessions.partitions.13 =
> >> 723
> >> tasknames.Partition 14.systems.kafka.streams.Sessions.partitions.14 =
> >> 14589258
> >> tasknames.Partition
> >> 15.systems.kafka.streams.Sessions.partitions.15=10886881
> >>
> >> Lukas
> >
>


Re: Samza and KStreams (KIP-28): LinkedIn's POV

2015-10-05 Thread Roger Hoover
Great.  Thanks, Yi.

On Mon, Oct 5, 2015 at 10:25 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Roger,
>
>
> On Sat, Oct 3, 2015 at 11:13 AM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > As previously discussed, the biggest request I
> > have is being able to run Samza without YARN, under something like
> > Kubernetes instead.
> >
> >
> Totally. We will be actively working on the standalone Samza after the
> upcoming 0.10 release.
>
>
> > Also, I'm curious.  What's the current state of the Samza SQL physical
> > operators?  Are they used in production yet?  Is there a doc on how to
> use
> > them?
> >
> >
> The current physical operators code now lives in samza-sql branch. There
> are still two big pending check-ins in-review right now, one to stabilize
> the operator's APIs, the other to implement the window operator. We are
> planning to finish the proto-type in Q4.
>
> Regards,
>
> -Yi
>


Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Roger Hoover
That would be great to let Kafka do as much heavy lifting as possible and
make it easier for other languages to implement Samza apis.

One thing to watch out for is the interplay between Kafka's group
management and the external scheduler/process manager's fault tolerance.
If a container dies, the Kafka group membership protocol will try to assign
it's tasks to other containers while at the same time the process manager
is trying to relaunch the container.  Without some consideration for this
(like a configurable amount of time to wait before Kafka alters the group
membership), there may be thrashing going on which is especially bad for
containers with large amounts of local state.

Someone else pointed this out already but I thought it might be worth
calling out again.

Cheers,

Roger


On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps j...@confluent.io wrote:

 Hey Roger,

 I couldn't agree more. We spent a bunch of time talking to people and that
 is exactly the stuff we heard time and again. What makes it hard, of
 course, is that there is some tension between compatibility with what's
 there now and making things better for new users.

 I also strongly agree with the importance of multi-language support. We are
 talking now about Java, but for application development use cases people
 want to work in whatever language they are using elsewhere. I think moving
 to a model where Kafka itself does the group membership, lifecycle control,
 and partition assignment has the advantage of putting all that complex
 stuff behind a clean api that the clients are already going to be
 implementing for their consumer, so the added functionality for stream
 processing beyond a consumer becomes very minor.

 -Jay

 On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  Metamorphosis...nice. :)
 
  This has been a great discussion.  As a user of Samza who's recently
  integrated it into a relatively large organization, I just want to add
  support to a few points already made.
 
  The biggest hurdles to adoption of Samza as it currently exists that I've
  experienced are:
  1) YARN - YARN is overly complex in many environments where Puppet would
 do
  just fine but it was the only mechanism to get fault tolerance.
  2) Configuration - I think I like the idea of configuring most of the job
  in code rather than config files.  In general, I think the goal should be
  to make it harder to make mistakes, especially of the kind where the code
  expects something and the config doesn't match.  The current config is
  quite intricate and error-prone.  For example, the application logic may
  depend on bootstrapping a topic but rather than asserting that in the
 code,
  you have to rely on getting the config right.  Likewise with serdes, the
  Java representations produced by various serdes (JSON, Avro, etc.) are
 not
  equivalent so you cannot just reconfigure a serde without changing the
  code.   It would be nice for jobs to be able to assert what they expect
  from their input topics in terms of partitioning.  This is getting a
 little
  off topic but I was even thinking about creating a Samza config linter
  that would sanity check a set of configs.  Especially in organizations
  where config is managed by a different team than the application
 developer,
  it's very hard to get avoid config mistakes.
  3) Java/Scala centric - for many teams (especially DevOps-type folks),
 the
  pain of the Java toolchain (maven, slow builds, weak command line
 support,
  configuration over convention) really inhibits productivity.  As more and
  more high-quality clients become available for Kafka, I hope they'll
 follow
  Samza's model.  Not sure how much it affects the proposals in this thread
  but please consider other languages in the ecosystem as well.  From what
  I've heard, Spark has more Python users than Java/Scala.
  (FYI, we added a Jython wrapper for the Samza API
 
 
 https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza
  and are working on a Yeoman generator
  https://github.com/Quantiply/generator-rico for Jython/Samza projects to
  alleviate some of the pain)
 
  I also want to underscore Jay's point about improving the user
 experience.
  That's a very important factor for adoption.  I think the goal should be
 to
  make Samza as easy to get started with as something like Logstash.
  Logstash is vastly inferior in terms of capabilities to Samza but it's
 easy
  to get started and that makes a big difference.
 
  Cheers,
 
  Roger
 
 
 
 
 
  On Tue, Jul 7, 2015 at 3:29 AM, Gianmarco De Francisci Morales 
  g...@apache.org wrote:
 
   Forgot to add. On the naming issues, Kafka Metamorphosis is a clear
  winner
   :)
  
   --
   Gianmarco
  
   On 7 July 2015 at 13:26, Gianmarco De Francisci Morales 
 g...@apache.org
  
   wrote:
  
Hi,
   
@Martin, thanks for you comments.
Maybe I'm missing some important point, but I think coupling the
  releases

Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Roger Hoover
That's great.  Thanks, Jay.

On Fri, Jul 10, 2015 at 8:46 AM, Jay Kreps j...@confluent.io wrote:

 Yeah totally agree. I think you have this issue even today, right? I.e. if
 you need to make a simple config change and you're running in YARN today
 you end up bouncing the job which then rebuilds state. I think the fix is
 exactly what you described which is to have a long timeout on partition
 movement for stateful jobs so that if a job is just getting bounced, and
 the cluster manager (or admin) is smart enough to restart it on the same
 host when possible, it can optimistically reuse any existing state it finds
 on disk (if it is valid).

 So in this model the charter of the CM is to place processes as stickily as
 possible and to restart or re-place failed processes. The charter of the
 partition management system is to control the assignment of work to these
 processes. The nice thing about this is that the work assignment, timeouts,
 behavior, configs, and code will all be the same across all cluster
 managers.

 So I think that prototype would actually give you exactly what you want
 today for any cluster manager (or manual placement + restart script) that
 was sticky in terms of host placement since there is already a configurable
 partition movement timeout and task-by-task state reuse with a check on
 state validity.

 -Jay

 On Fri, Jul 10, 2015 at 8:34 AM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  That would be great to let Kafka do as much heavy lifting as possible and
  make it easier for other languages to implement Samza apis.
 
  One thing to watch out for is the interplay between Kafka's group
  management and the external scheduler/process manager's fault tolerance.
  If a container dies, the Kafka group membership protocol will try to
 assign
  it's tasks to other containers while at the same time the process manager
  is trying to relaunch the container.  Without some consideration for this
  (like a configurable amount of time to wait before Kafka alters the group
  membership), there may be thrashing going on which is especially bad for
  containers with large amounts of local state.
 
  Someone else pointed this out already but I thought it might be worth
  calling out again.
 
  Cheers,
 
  Roger
 
 
  On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps j...@confluent.io wrote:
 
   Hey Roger,
  
   I couldn't agree more. We spent a bunch of time talking to people and
  that
   is exactly the stuff we heard time and again. What makes it hard, of
   course, is that there is some tension between compatibility with what's
   there now and making things better for new users.
  
   I also strongly agree with the importance of multi-language support. We
  are
   talking now about Java, but for application development use cases
 people
   want to work in whatever language they are using elsewhere. I think
  moving
   to a model where Kafka itself does the group membership, lifecycle
  control,
   and partition assignment has the advantage of putting all that complex
   stuff behind a clean api that the clients are already going to be
   implementing for their consumer, so the added functionality for stream
   processing beyond a consumer becomes very minor.
  
   -Jay
  
   On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover roger.hoo...@gmail.com
   wrote:
  
Metamorphosis...nice. :)
   
This has been a great discussion.  As a user of Samza who's recently
integrated it into a relatively large organization, I just want to
 add
support to a few points already made.
   
The biggest hurdles to adoption of Samza as it currently exists that
  I've
experienced are:
1) YARN - YARN is overly complex in many environments where Puppet
  would
   do
just fine but it was the only mechanism to get fault tolerance.
2) Configuration - I think I like the idea of configuring most of the
  job
in code rather than config files.  In general, I think the goal
 should
  be
to make it harder to make mistakes, especially of the kind where the
  code
expects something and the config doesn't match.  The current config
 is
quite intricate and error-prone.  For example, the application logic
  may
depend on bootstrapping a topic but rather than asserting that in the
   code,
you have to rely on getting the config right.  Likewise with serdes,
  the
Java representations produced by various serdes (JSON, Avro, etc.)
 are
   not
equivalent so you cannot just reconfigure a serde without changing
 the
code.   It would be nice for jobs to be able to assert what they
 expect
from their input topics in terms of partitioning.  This is getting a
   little
off topic but I was even thinking about creating a Samza config
  linter
that would sanity check a set of configs.  Especially in
 organizations
where config is managed by a different team than the application
   developer,
it's very hard to get avoid config mistakes.
3) Java/Scala centric

Re: Sample code or tutorial for writing/reading Avro type message in Samza

2015-11-17 Thread Roger Hoover
Hi Selina,

If you want to use Confluent's schema registry for Avro, then I have an
example in this repo:

https://github.com/theduderog/hello-samza-confluent

Cheers,

Roger

On Tue, Nov 17, 2015 at 12:32 AM, Selina Tech  wrote:

> Dear All:
>  Do you know where I can find the tutorial or sample code for writing
> Avro type message to Kafka and reading Avro type message from Kafka in
> Samza?
>   I am wondering how should I serialized GenericRecord to byte and
> deserialized it?
>  Your comments/suggestion are highly appreciated.
>
> Sincerely,
> Selina
>


Re: New Samza blog published - http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node

2015-08-25 Thread Roger Hoover
Thanks for sharing!

Tao, did you use YARN to run 15 containers or is there a way to have them
statically divide up the tasks?



On Mon, Aug 24, 2015 at 12:03 PM, Ed Yakabosky 
eyakabo...@linkedin.com.invalid wrote:

 Hi Samza open source,

 I want to share that Tao Feng
 https://www.linkedin.com/pub/tao-feng/14/958/171 (from LinkedIn's
 Performance Team) has published a blog post
 
 http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node
 
 on
 Samza perf benchmarks in collaboration with our development team.  A lot of
 hard work went into this blog post so please join me in congratulating Tao
 and other contributors, and please share to your Big Data social media
 circles.

 Synopsis: *T**he objective of the blog is to measure Samza's performance in
 terms of the message-processing rate for a single machine for typical Samza
 use cases. T**his will help engineers to understand and to optimize
 performance and provide a basis for establishing a capacity model to run
 Samza platform as a service.*

 Thanks,
 Ed Yakabosky
 Streams Infrastructure TPM, LinkedIn



Re: Samza and KStreams (KIP-28): LinkedIn's POV

2015-10-03 Thread Roger Hoover
Hi Yi,

Thank you for sharing this update and perspective.  I tend to agree that
for simple, stateless cases, things could be easier and hopefully KStreams
may help with that.  I also appreciate a lot of features that Samza already
supports for operations.  As previously discussed, the biggest request I
have is being able to run Samza without YARN, under something like
Kubernetes instead.

Also, I'm curious.  What's the current state of the Samza SQL physical
operators?  Are they used in production yet?  Is there a doc on how to use
them?

Thanks,

Roger



On Fri, Oct 2, 2015 at 1:54 PM, Yi Pan  wrote:

> Hi, all Samza-lovers,
>
> This question on the relationship of Kafka KStream (KIP-28) and Samza has
> come up a couple times recently. So we wanted to clarify where we stand at
> LinkedIn in terms of this discussion.
>
> Samza has historically had a symbiotic relationship with Kafka and will
> continue to work very well with Kafka.  Earlier in the year, we had an
> in-depth discussion exploring an even deeper integration with Kafka.  After
> hitting multiple practical issues (e.g. Apache rules) and technical issues
> we had to give up on that idea.  As a fall out of the discussion, the Kafka
> community is adding some of the basic event processing capabilities into
> Kafka core directly. The basic callback/push style programming model by
> itself is a great addition to the Kafka API set.
>
> However at LinkedIn, we continue to be firmly committed to Samza as our
> stream processing framework. Although KStream is a nice addition to Kafka
> stack, our goals for Samza are broader. There are some key technical
> differences that makes Samza the right strategy for us.
>
> 1.  Support for non-kafka systems :
>
> At LinkedIn a larger percentage of our streaming jobs use Databus as an
> input source.   For any such non-Kafka source, although the CopyCat
> connector framework gives a common model for pulling data out of a source
> and pushing it into Kafka, it introduces yet another piece of
> infrastructure that we have to operate and manage.  Also for any companies
> who are already on AWS, Google Compute, Azure etc.  asking them to deploy
> and operate kafka in AWS instead of using the natively supported services
> like Kinesis, Google Cloud pub-sub, etc. etc. can potentially be a
> non-starter.  With more acquisitions at LinkedIn that use AWS we are also
> facing this first hand.  The Samza community has a healthy set of system
> producers/consumers which are in the works (Kinesis, ActiveMQ,
> ElasticSearch, HDFS, etc.).
>
> 2. We run Samza as a Stream Processing Service at LinkedIn. This is
> fundamentally different from KStream.
>
> This is similar to AWS Lambda and Google Cloud Dataflow, Azure Stream
> Insight and similar services.  The service makes it much easier for
> developers to get their stream processing jobs up and running in production
> by helping with the most common problems like monitoring, dashboards,
> auto-scale, rolling upgrades and such.
>
> The truth is that if the stream processing application is stateless then
> some of these common problems are not as involved and can be solved even on
> regular IaaS platforms like EC2 and such.   Arguably stateless applications
> can be built easily on top of the native APIs from the input source like
> kafka, kinesis etc.
>
> The place where Samza shines is with stateful stream processing
> applications.  When a Samza application uses the local rocks DB based
> state, the application needs special care in terms of rolling upgrades,
> addition/removal of containers/machines, temporary machine failures,
> capacity management.  We have already done some really good work in Samza
> 0.10 to make sure that we don't reseed the state from kafka (i.e.
> host-affinity feature that allows to reuse the local states).  In the
> absence of this feature, we had multiple production issues caused due to
> network saturation during state reseeding from kafka.   The problems with
> stateful applications are similar to problems encountered when building
> no-sql databases and other data systems.
>
> There are surely some scenarios where customers don't want any YARN
> dependency and want to run their stream processing application on a
> dedicated set of nodes.  This is where KStream clearly has an advantage
> over current Samza. Prior to KStream we had a patch in Samza which also
> solved the same problem (SAMZA-516). We do expect to finish this patch soon
> and formally support Stand Alone Samza.
>
> 3. Operators for Stream Processing and SQL :
>
> At LinkedIn, there is a growing need to iterate Samza jobs faster in the
> loop of implement, deploy, revise the code, and deploy again. A key
> bottleneck that slows down this iteration is the implementation of a Samza
> job. It has long-been recognized in the Samza community that there is a
> strong need for a high-level language support to shorten this iterative
> process. Since last year, we have 

Re: Executing Samza jobs natively in Kubernetes

2015-11-29 Thread Roger Hoover
Elias,

I would also love to be able to deploy Samza on Kubernetes with dynamic
task management.  Thanks for sharing this.  It may be a good interim
solution.

Roger

On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy 
wrote:

> I've been exploring Samza for stream processing as well as Kubernetes as a
> container orchestration system and I wanted to be able to use one with the
> other.  The prospect of having to execute YARN either along side or on top
> of Kubernetes did not appeal to me, so I developed a KubernetesJob
> implementation of SamzaJob.
>
> You can find the details at https://github.com/eliaslevy/samza_kubernetes,
> but in summary KubernetesJob executes and generates a serialized JobModel.
> Instead of interacting with Kubernetes directly to create the
> SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with the
> YARN RM), it output a config YAML file that can be used to create the
> SamzaContainers in Kubernetes by using Resource Controllers.  For this you
> require to package your job as a Docker image.  You can reach the README at
> the above repo for details.
>
> A few observations:
>
> It would be useful if SamzaContainer accepted the JobModel via an
> environment variable.  Right not it expects a URL to download it from.  I
> get around this by using a entry point script that copies the model from an
> environment variable into a file, then passes a file URL to SamzaContainer.
>
> SamzaContainer doesn't allow you to configure the JMX port.  It selects a
> port at random from the ephemeral range as it expects to execute in YARN
> where a static port could result in a conflict.  This is not the case in
> Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP
> address.
>
> This implementation doesn't provide a Samza dashboard, which in the YARN
> implementation is hosted in the Application Master.  There didn't seem to
> be much value provided by the dashboard that is not already provided by the
> Kubernetes tools for monitoring pods.
>
> I've successfully executed the hello-samza jobs in Kubernetes:
>
> $ kubectl get po
> NAME   READY STATUSRESTARTS   AGE
> kafka-1-jjh8n  1/1   Running   0  2d
> kafka-2-buycp  1/1   Running   0  2d
> kafka-3-tghkp  1/1   Running   0  2d
> wikipedia-feed-0-4its2 1/1   Running   0  1d
> wikipedia-parser-0-l0onv   1/1   Running   0  17h
> wikipedia-parser-1-crrxh   1/1   Running   0  17h
> wikipedia-parser-2-1c5nn   1/1   Running   0  17h
> wikipedia-stats-0-3gaiu1/1   Running   0  16h
> wikipedia-stats-1-j5qlk1/1   Running   0  16h
> wikipedia-stats-2-2laos1/1   Running   0  16h
> zookeeper-1-1sb4a  1/1   Running   0  2d
> zookeeper-2-dndk7  1/1   Running   0  2d
> zookeeper-3-46n09  1/1   Running   0  2d
>
>
> Finally, accessing services within the Kubernetes cluster from the outside
> is quite cumbersome unless one uses an external load balancer.  This makes
> it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper and
> Kafka to find out the number of partitions on the topics it will subscribe
> to, so it can assign them statically among the number of containers
> requested.
>
> Ideally Samza would operate along the lines of the Kafka high-level
> consumer, which dynamically coordinate to allocate work among members of a
> consumer group.  This would do away with the new to execute SamzaJob a
> priori to generate the JobModel to pass to the SamzaContainers.  It would
> also allow for dynamically changing the number of containers without having
> the shutdown the job.
>


Re: Executing Samza jobs natively in Kubernetes

2015-11-30 Thread Roger Hoover
Awesome.  Thanks.

On Sun, Nov 29, 2015 at 3:25 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> Roger,
>
> You are welcomed.  If you want to experiment, you can use my hello samza
> <https://hub.docker.com/r/elevy/hello-samza/> Docker image.
>
> On Sun, Nov 29, 2015 at 12:19 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Elias,
> >
> > I would also love to be able to deploy Samza on Kubernetes with dynamic
> > task management.  Thanks for sharing this.  It may be a good interim
> > solution.
> >
> > Roger
> >
> > On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy <
> fearsome.lucid...@gmail.com>
> > wrote:
> >
> > > I've been exploring Samza for stream processing as well as Kubernetes
> as
> > a
> > > container orchestration system and I wanted to be able to use one with
> > the
> > > other.  The prospect of having to execute YARN either along side or on
> > top
> > > of Kubernetes did not appeal to me, so I developed a KubernetesJob
> > > implementation of SamzaJob.
> > >
> > > You can find the details at
> > https://github.com/eliaslevy/samza_kubernetes,
> > > but in summary KubernetesJob executes and generates a serialized
> > JobModel.
> > > Instead of interacting with Kubernetes directly to create the
> > > SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with
> the
> > > YARN RM), it output a config YAML file that can be used to create the
> > > SamzaContainers in Kubernetes by using Resource Controllers.  For this
> > you
> > > require to package your job as a Docker image.  You can reach the
> README
> > at
> > > the above repo for details.
> > >
> > > A few observations:
> > >
> > > It would be useful if SamzaContainer accepted the JobModel via an
> > > environment variable.  Right not it expects a URL to download it
> from.  I
> > > get around this by using a entry point script that copies the model
> from
> > an
> > > environment variable into a file, then passes a file URL to
> > SamzaContainer.
> > >
> > > SamzaContainer doesn't allow you to configure the JMX port.  It
> selects a
> > > port at random from the ephemeral range as it expects to execute in
> YARN
> > > where a static port could result in a conflict.  This is not the case
> in
> > > Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP
> > > address.
> > >
> > > This implementation doesn't provide a Samza dashboard, which in the
> YARN
> > > implementation is hosted in the Application Master.  There didn't seem
> to
> > > be much value provided by the dashboard that is not already provided by
> > the
> > > Kubernetes tools for monitoring pods.
> > >
> > > I've successfully executed the hello-samza jobs in Kubernetes:
> > >
> > > $ kubectl get po
> > > NAME   READY STATUSRESTARTS   AGE
> > > kafka-1-jjh8n  1/1   Running   0  2d
> > > kafka-2-buycp  1/1   Running   0  2d
> > > kafka-3-tghkp  1/1   Running   0  2d
> > > wikipedia-feed-0-4its2 1/1   Running   0  1d
> > > wikipedia-parser-0-l0onv   1/1   Running   0  17h
> > > wikipedia-parser-1-crrxh   1/1   Running   0  17h
> > > wikipedia-parser-2-1c5nn   1/1   Running   0  17h
> > > wikipedia-stats-0-3gaiu1/1   Running   0  16h
> > > wikipedia-stats-1-j5qlk1/1   Running   0  16h
> > > wikipedia-stats-2-2laos1/1   Running   0  16h
> > > zookeeper-1-1sb4a  1/1   Running   0  2d
> > > zookeeper-2-dndk7  1/1   Running   0  2d
> > > zookeeper-3-46n09  1/1   Running   0  2d
> > >
> > >
> > > Finally, accessing services within the Kubernetes cluster from the
> > outside
> > > is quite cumbersome unless one uses an external load balancer.  This
> > makes
> > > it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper
> > and
> > > Kafka to find out the number of partitions on the topics it will
> > subscribe
> > > to, so it can assign them statically among the number of containers
> > > requested.
> > >
> > > Ideally Samza would operate along the lines of the Kafka high-level
> > > consumer, which dynamically coordinate to allocate work among members
> of
> > a
> > > consumer group.  This would do away with the new to execute SamzaJob a
> > > priori to generate the JobModel to pass to the SamzaContainers.  It
> would
> > > also allow for dynamically changing the number of containers without
> > having
> > > the shutdown the job.
> > >
> >
>


Re: HTTP-based Elasticsearch system producer and reusable task

2016-02-09 Thread Roger Hoover
Hi Yi,

It could be merged into the Samza project if there's enough interest but
may need some re-working depending on which dependencies are ok to bring
in.  I did it outside of the Samza project first because I had to get it
done quickly so it relies on Java 8 features, dropwizard metrics for
histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which
itself drags in more dependencies (Guava, Gson, commons http).

There are few issues with the existing ElasticsearchSystemProducer:

   1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch
   Java API (a bulky dependency)
   2. It only supports index requests.  I needed to also support updates
   and deletes.
   3. There currently no plugin mechanism to register a flush listener.
   The reason I needed that was to be able to report end to end latency stats
   (total pipeline latency = commit time - event time).

#3 is easily solvable with a additional plugin options. #1 and #2 require
changing the system producer API.

Roger

On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Roger,
>
> That's awesome! Are you planning to submit the HTTP-based system producer
> in Samza open-source samza-elasticsearch module? If ElasticSearch community
> suggest that HTTP-based clients be the recommended way, we should use it in
> samza-elasticsearch as well. And what's your opinion on the existing
> ElasticsearchSystemProducer? If the SystemProducer APIs and configure
> options do not change, I would vote to replace the implementation w/
> HTTP-based ElasticsearchSystemProducer.
>
> Thanks for putting this new additions up!
>
> -Yi
>
> On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Hi Samza folks,
> >
> > For people who want to use HTTP to integrate with Elasticsearch, I wrote
> an
> > HTTP-based system producer and a reusable task, including latency stats
> > from event origin time, task processing time, and time spent talking to
> > Elasticsearch API.
> >
> >
> https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md
> >
> > Cheers,
> >
> > Roger
> >
>


Re: ElasticsearchSystemProducer Crashes Samza Job

2016-02-16 Thread Roger Hoover
Hi Jeremiah,

There's currently no way to do that.  I think the best way to modify the
existing ElasticsearchSystemProducer would be to add a config option for a
callback to let you customize this behavior.  Basically, a pluggable
listener (
https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
).



On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams  wrote:

> We have a samza job configured to run in a yarn cluster. This job consumes
> multiple kafka topics and routes the messages to elasticsearch for
> indexing. When enough batch-updates to elasticsearch fail using the
> ElasticsearchSystemProducer, the entire samza job dies. Due to
> checkpointing + yarn, the job starts backup, starts reading where it left
> off and dies again. Enter loop.
>
> Updates to ES are failing due to invalid data on the part of our consumers
> but I can't aways control them so need to be defensive about the code. I
> don't see how to handle this in any of the source examples. I would like to
> just trap this error and if it is what I expect it to be - squash it. Can
> someone point me in the right direction?
>
> Below is the log where the failure occurs.
>
> 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> message from TaskName-Partition 5 to system elastic.
> 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> exception in thread (name=main). Exiting process now.
> org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 5 to system elastic.
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> at
>
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms
> 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> exiting
>
> - jeremiah
>


Re: ElasticsearchSystemProducer Crashes Samza Job

2016-02-16 Thread Roger Hoover
The code that you showed below is part of the BulkProcessor.Listener
interface so if that listener were pluggable, you could override the
default behavior (which is to only ignore version conflicts).



On Tue, Feb 16, 2016 at 12:27 PM, jeremiah adams <jadams...@gmail.com>
wrote:

> The root of the issue may be in the HTTP status code handling. This code
> seems to imply that the only valid error case from Elasticsearch is
> conflict. This is too narrow of a constraint. In one of my use cases, a
> mapping/message conflict occurs resulting in an HTTP 400. In my case, it is
> perfectly reasonable to log the error, not raise hasFatalError = true and
> continue processing.  A way to control this via properties or some other
> mechanism would probably solve the issue. Setting the hasFatalError flag
> looks to be the source of the unhandled exception that ultimately fails the
> job.
>
> I don't think the ListenerCallback will solve the problem. I don't
> understand how that might stop the unhandled exception being raised by
> flush().
>
>   if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
>  LOGGER.info("Failed to index document in Elasticsearch: " +
> itemResp.getFailureMessage());
>} else {
>  hasFatalError = true;
>  LOGGER.error("Failed to index document in Elasticsearch: " +
> itemResp.getFailureMessage());
>}
>
> - jeremiah
>
> On Tue, Feb 16, 2016 at 12:18 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Hi Jeremiah,
> >
> > There's currently no way to do that.  I think the best way to modify the
> > existing ElasticsearchSystemProducer would be to add a config option for
> a
> > callback to let you customize this behavior.  Basically, a pluggable
> > listener (
> >
> >
> https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
> > ).
> >
> >
> >
> > On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <jadams...@gmail.com>
> > wrote:
> >
> > > We have a samza job configured to run in a yarn cluster. This job
> > consumes
> > > multiple kafka topics and routes the messages to elasticsearch for
> > > indexing. When enough batch-updates to elasticsearch fail using the
> > > ElasticsearchSystemProducer, the entire samza job dies. Due to
> > > checkpointing + yarn, the job starts backup, starts reading where it
> left
> > > off and dies again. Enter loop.
> > >
> > > Updates to ES are failing due to invalid data on the part of our
> > consumers
> > > but I can't aways control them so need to be defensive about the code.
> I
> > > don't see how to handle this in any of the source examples. I would
> like
> > to
> > > just trap this error and if it is what I expect it to be - squash it.
> Can
> > > someone point me in the right direction?
> > >
> > > Below is the log where the failure occurs.
> > >
> > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> > > message from TaskName-Partition 5 to system elastic.
> > > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> > > exception in thread (name=main). Exiting process now.
> > > org.apache.samza.SamzaException: Unable to send message from
> > > TaskName-Partition 5 to system elastic.
> > > at
> > >
> > >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> > > at
> > >
> > >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > at
> > >
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > > at
> org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> > > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> > > at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000
> ms
> > > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> > > exiting
> > >
> > > - jeremiah
> > >
> >
>


Re: [DISCUSS] Moving to github/pull-request for code review and check-in

2016-02-18 Thread Roger Hoover
+1 - Thanks for bringing this up, Yi.  I've done it both ways and feel pull 
requests are much easier.

Sent from my iPhone

> On Feb 18, 2016, at 4:25 PM, Navina Ramesh  
> wrote:
> 
> +1
> 
> Haven't tried any contribution with pull requests. But sounds simpler than
> attaching the patch to JIRA.
> 
> Navina
> 
>> On Thu, Feb 18, 2016 at 4:01 PM, Jacob Maes  wrote:
>> 
>> +1
>> 
>> As a relatively new contributor to Samza, I've certainly felt the current
>> process was overly-complicated.
>> 
>>> On Thu, Feb 18, 2016 at 3:53 PM, Yi Pan  wrote:
>>> 
>>> Hi, all,
>>> 
>>> I want to start the discussion on our code review/commit process.
>>> 
>>> I felt that our code review and check-in process is a little bit
>>> cumbersome:
>>> - developers need to create RBs and attach diff to JIRA
>>> - committers need to review RBs, dowload diff and apply, then push.
>>> 
>>> It would be much lighter if we take the pull request only approach, as
>>> Kafka already converted to:
>>> - for the developers, the only thing needed is to open a pull request.
>>> - for committers, review and apply patch is from the same PR and merge
>> can
>>> be done directly on remote git repo.
>>> 
>>> Of course, there might be some hookup scripts that we will need to link
>>> JIRA w/ pull request in github, which Kafka already does. Any comments
>> and
>>> feedbacks are welcome!
>>> 
>>> Thanks!
>>> 
>>> -Yi
> 
> 
> 
> -- 
> Navina R.


Re: HTTP-based Elasticsearch system producer and reusable task

2016-02-10 Thread Roger Hoover
Hi Yi,

Please see my comment inline.

On Tue, Feb 9, 2016 at 10:08 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Roger,
>
> Got it! I would like to understand more on the SystemProducer API changes
> required by #1 and #2. Could you elaborate a bit more?
>
>
For #1, IndexRequestFactory interface (
https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java#L32)
returns an object of type (org.elasticsearch.action.index.IndexRequest)
which comes from the elasticsearch jar (
https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
).

For #2, the way to support Updates and Deletes would be to have the
"IndexRequestFactory" return an more generic "ActionRequest" which could be
an index, update, or delete action.  (in which case it should be called
ActionRequestFactory).




> Regarding to JDK8 required in the new HTTP-based Elasticsearch producer, I
> want to ask how you are motivated to go w/ JDK8. It does bring a lot more
> nice features. If we deprecate source-level compatibility to JDK7, we can
> benefit from a lot of new features from JDK8, like lambda, stream APIs,
> etc. And refactor Scala code to JDK8 is also much easier.
>
>
I really like that functions are first-class citizens in java 8 and stream
api is quite helpful as well.

For example, first-class functions helped me avoid duplicate code that
would have occurred because JEST doesn't expose a common ancestor type for
each type of builder (index, update, delete).  Passing in functions instead
of an objects with a common ancestor type solved the problem.

https://github.com/quantiply/rico/blob/master/samza-elasticsearch/src/main/java/com/quantiply/elasticsearch/HTTPBulkLoader.java#L237-L272


> Thanks!
>
> -Yi
>
> On Tue, Feb 9, 2016 at 4:19 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Hi Yi,
> >
> > It could be merged into the Samza project if there's enough interest but
> > may need some re-working depending on which dependencies are ok to bring
> > in.  I did it outside of the Samza project first because I had to get it
> > done quickly so it relies on Java 8 features, dropwizard metrics for
> > histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which
> > itself drags in more dependencies (Guava, Gson, commons http).
> >
> > There are few issues with the existing ElasticsearchSystemProducer:
> >
> >1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch
> >Java API (a bulky dependency)
> >2. It only supports index requests.  I needed to also support updates
> >and deletes.
> >3. There currently no plugin mechanism to register a flush listener.
> >The reason I needed that was to be able to report end to end latency
> > stats
> >(total pipeline latency = commit time - event time).
> >
> > #3 is easily solvable with a additional plugin options. #1 and #2 require
> > changing the system producer API.
> >
> > Roger
> >
> > On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> > > Hi, Roger,
> > >
> > > That's awesome! Are you planning to submit the HTTP-based system
> producer
> > > in Samza open-source samza-elasticsearch module? If ElasticSearch
> > community
> > > suggest that HTTP-based clients be the recommended way, we should use
> it
> > in
> > > samza-elasticsearch as well. And what's your opinion on the existing
> > > ElasticsearchSystemProducer? If the SystemProducer APIs and configure
> > > options do not change, I would vote to replace the implementation w/
> > > HTTP-based ElasticsearchSystemProducer.
> > >
> > > Thanks for putting this new additions up!
> > >
> > > -Yi
> > >
> > > On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover <roger.hoo...@gmail.com>
> > > wrote:
> > >
> > > > Hi Samza folks,
> > > >
> > > > For people who want to use HTTP to integrate with Elasticsearch, I
> > wrote
> > > an
> > > > HTTP-based system producer and a reusable task, including latency
> stats
> > > > from event origin time, task processing time, and time spent talking
> to
> > > > Elasticsearch API.
> > > >
> > > >
> > >
> >
> https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md
> > > >
> > > > Cheers,
> > > >
> > > > Roger
> > > >
> > >
> >
>


Re: ThreadJobFactory in production

2016-03-02 Thread Roger Hoover
Jose,

It would be great if you could share it.  I'm interested in trying to use
it as well.

Thanks,

Roger

On Wed, Mar 2, 2016 at 2:31 PM, José Barrueta  wrote:

> Hi guys,
>
> At Stormpath, we made a custom samza 10 version merging SAMZA-41 into it,
> it's working well, so we are thinking to update that patch later this week
> so it can be added to the main project.
>
> HTH,
>
> Jose Luis Barrueta
>
> On Wed, Mar 2, 2016 at 2:11 PM, Yi Pan  wrote:
>
> > Hi, Robert,
> >
> > The main reason that ThreadJobFactory and ProcessJobFactory are not
> > considered "production-ready" is that there is only one container for the
> > job and all tasks are assigned to the single container. Hence, it is not
> > easy to scale out of a single host.
> >
> > As Rick mentioned, Netflix has put up a patch in SAMZA-41 based on 0.9.1
> o
> > allow static assignment of a subset of partitions to a single ProcessJob,
> > which allows to launch multiple ProcessJobs in different hosts. We
> planned
> > to merge it to 0.10. But it turns out that too much changes have gone
> into
> > 0.10 and it became difficult to merge the patch. At this point, we can
> > still try the following two options:
> > 1) We can attempt to merge SAMZA-41 to 0.10.1 again, it may take some
> > effort but would give a stop-gap solution.
> > 2) We are working on a standalone Samza model (SAMZA-516, SAMZA-881) to
> > allow users to run Samza w/o depending on YarnJobFactory. This is a
> > long-term effort and will take some time to flesh out. Please join the
> > discussion there s.t. we can be more aligned in our effort.
> >
> > Hope the above gives you an overall picture on where we are going.
> >
> > Thanks a lot!
> >
> > -Yi
> >
> > On Wed, Mar 2, 2016 at 1:28 PM, Rick Mangi  wrote:
> >
> > > There was an interesting thread a while back from I believe the netflix
> > > guys about running ThreadJobFactory in production.
> > >
> > >
> > > > On Mar 2, 2016, at 4:20 PM, Robert Crim  wrote:
> > > >
> > > > Hi,
> > > >
> > > > We're currently working on a solution that allows us to run Samza
> jobs
> > on
> > > > Mesos. This seems to be going well, and something we'd like to move
> > away
> > > > from when native Mesos support is added to Samza.
> > > >
> > > > While we're developing and testing our scheduler, I'm wondering about
> > the
> > > > implications of running tasks with the ThreadJobFactory in
> > "production".
> > > > The documentation advise against this, but it's not clear why.
> > > >
> > > > If we were using the ThreadJobFactory inside of a docker container on
> > > Mesos
> > > > with Marathon for production, would be our main problem? These are
> not
> > > > particularly high-load tasks. Aside from not be able to get
> > find-grained
> > > > resource scheduling per-task, it seems like the main issue the not
> > being
> > > to
> > > > easily tell when a job stops due to error / exception.
> > > >
> > > > In other words, what would be stop-stopping reasons to not use the
> > > > TreadJobFactory in production?
> > > >
> > > > Thanks,
> > > > Rob
> > >
> > >
> >
>