Kafka topic naming conventions
Hi, Wondering what naming conventions people are using for topics in Kafka. When there's re-partitioning involved, you can end up with multiple topics that have the exact same data but are partitioned differently. How do you name them? Thanks, Roger
Re: Kafka topic naming conventions
Thanks, guys. I was also playing around with including partition count and even the partition key in the topic name. My thought was that topics may have the same data and number of partitions but only differ by partition key. After a while, the naming does get crazy (too long and ugly). We really need a topic metatdata store. On Wed, Mar 18, 2015 at 6:21 PM, Chinmay Soman chinmay.cere...@gmail.com wrote: Yeah ! It does seem a bit hackish - but I think this approach promises less config/operation errors. Although I think some of these checks can be built within Samza - assuming Kafka has a metadata store in the near future - the Samza container can validate the #topics against this store. On Wed, Mar 18, 2015 at 6:16 PM, Chris Riccomini criccom...@apache.org wrote: Hey Chinmay, Cool, this is good feedback. I didn't think I was *that* crazy. :) Cheers, Chris On Wed, Mar 18, 2015 at 6:10 PM, Chinmay Soman chinmay.cere...@gmail.com wrote: Thats what we're doing as well - appending partition count to the kafka topic name. This actually helps keep track of the #partitions for each topic (since Kafka doesn't have a Metadata store yet). In case of topic expansion - we actually just resort to creating a new topic. Although that is an overhead - the thought process is that this will minimize operational errors. Also, this is necessary to do in case we're doing some kind of joins. On Wed, Mar 18, 2015 at 5:59 PM, Jakob Homan jgho...@gmail.com wrote: On 18 March 2015 at 17:48, Chris Riccomini criccom...@apache.org wrote: One thing I haven't seen, but might be relevant, is including partition counts in the topic. Yeah, but then if you change the partition count later on, you've got incorrect information forever. Or you need to create a new stream, which might be a nice forcing function to make sure your join isn't screwed up. There'd need to be something somewhere to enforce that though. -- Thanks and regards Chinmay Soman -- Thanks and regards Chinmay Soman
Re: Log error deploying on YARN [Samza 0.8.0]
Ah, yes. That's it! Thanks, Chris. On Tue, Mar 24, 2015 at 2:30 PM, Chris Riccomini criccom...@apache.org wrote: Hey Roger, You're likely hitting this issue: https://issues.apache.org/jira/browse/SAMZA-456 Can you have a look and see if that's the problem? We missed some JARs that need to be put in to the YARN NM classpath. Cheers, Chris On Tue, Mar 24, 2015 at 2:22 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi all, I'm new to YARN and trying to have YARN download the Samza job tarball ( https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html ). From the log, it seems that the download failed. I've tested that the file is available via curl. The error message is: org/apache/samza/util/Logging I appreciate any suggestions. Roger 2015-03-24 17:13:05,469 INFO [Socket Reader #1 for port 33749] ipc.Server (Server.java:saslProcess(1294)) - Auth successful for appattempt_1427226422217_0005_02 (auth:SIMPLE) 2015-03-24 17:13:05,473 INFO [IPC Server handler 15 on 33749] containermanager.ContainerManagerImpl (ContainerManagerImpl.java:startContainerInternal(572)) - Start request for container_1427226422217_0005_02_01 by user opintel 2015-03-24 17:13:05,473 INFO [IPC Server handler 15 on 33749] nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) - USER=opintel IP=10.53.152.54 OPERATION=Start Container Request TARGET=ContainerManageImpl RESULT=SUCCESS APPID=application_1427226422217_0005 CONTAINERID=container_1427226422217_0005_02_01 2015-03-24 17:13:05,473 INFO [AsyncDispatcher event handler] application.Application (ApplicationImpl.java:transition(296)) - Adding container_1427226422217_0005_02_01 to application application_1427226422217_0005 2015-03-24 17:13:05,474 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(884)) - Container container_1427226422217_0005_02_01 transitioned from NEW to LOCALIZING 2015-03-24 17:13:05,474 INFO [AsyncDispatcher event handler] containermanager.AuxServices (AuxServices.java:handle(175)) - Got event CONTAINER_INIT for appId application_1427226422217_0005 2015-03-24 17:13:05,475 INFO [AsyncDispatcher event handler] localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from INIT to DOWNLOADING 2015-03-24 17:13:05,475 INFO [AsyncDispatcher event handler] localizer.ResourceLocalizationService (ResourceLocalizationService.java:handle(596)) - Created localizer for container_1427226422217_0005_02_01 2015-03-24 17:13:05,480 INFO [LocalizerRunner for container_1427226422217_0005_02_01] localizer.ResourceLocalizationService (ResourceLocalizationService.java:writeCredentials(1029)) - Writing credentials to the nmPrivate file /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens. Credentials list: 2015-03-24 17:13:05,481 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing user opintel 2015-03-24 17:13:05,492 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:startLocalizer(103)) - Copying from /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens to /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens 2015-03-24 17:13:05,492 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005 = file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005 2015-03-24 17:13:05,520 INFO [IPC Server handler 4 on 8040] localizer.ResourceLocalizationService (ResourceLocalizationService.java:update(928)) - DEBUG: FAILED { http:// somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null }, org/apache/samza/util/Logging 2015-03-24 17:13:05,520 INFO [IPC Server handler 4 on 8040] localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from DOWNLOADING to FAILED 2015-03-24 17:13:05,520 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(884)) - Container container_1427226422217_0005_02_01 transitioned from LOCALIZING to LOCALIZATION_FAILED 2015-03-24 17:13:05,521 INFO [AsyncDispatcher event handler] localizer.LocalResourcesTrackerImpl
Re: Error running integration tests
Do I need to bring up sshd on my laptop or can the tests be made to not ssh? On Wed, Mar 25, 2015 at 4:27 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, I wanted to see if I could run the integration tests on the 0.9.0 branch on my Mac. I cloned the 0.9.0 branch from the github mirror, built everything (./gradlew clean build), and tried to run the integration tests. ./bin/integration-tests.sh /tmp/roger I get an error when the test script tries to deploy ZooKeeper using SSH. I'm running on Mac OS X. Any suggestions? Thanks, Roger 2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single execution due to setup_suite failure: Traceback (most recent call last): File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py, line 107, in run self.deployment_module.setup_suite() File /tmp/roger/scripts/deployment.py, line 76, in setup_suite 'hostname': host File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py, line 77, in deploy self.install(unique_id, configs) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py, line 129, in install with get_ssh_client(hostname, username=runtime.get_username(), password=runtime.get_password()) as ssh: File /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py, line 17, in __enter__ return self.gen.next() File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py, line 204, in get_ssh_client ssh.connect(hostname, username=username, password=password) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py, line 251, in connect retry_on_signal(lambda: sock.connect(addr)) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py, line 270, in retry_on_signal return function() File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py, line 251, in lambda retry_on_signal(lambda: sock.connect(addr)) File /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py, line 224, in meth return getattr(self._sock,name)(*args) error: [Errno 61] Connection refused
Error running integration tests
Hi, I wanted to see if I could run the integration tests on the 0.9.0 branch on my Mac. I cloned the 0.9.0 branch from the github mirror, built everything (./gradlew clean build), and tried to run the integration tests. ./bin/integration-tests.sh /tmp/roger I get an error when the test script tries to deploy ZooKeeper using SSH. I'm running on Mac OS X. Any suggestions? Thanks, Roger 2015-03-25 16:11:40,368 zopkio.test_runner [ERROR] Aborting single execution due to setup_suite failure: Traceback (most recent call last): File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/test_runner.py, line 107, in run self.deployment_module.setup_suite() File /tmp/roger/scripts/deployment.py, line 76, in setup_suite 'hostname': host File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/deployer.py, line 77, in deploy self.install(unique_id, configs) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/adhoc_deployer.py, line 129, in install with get_ssh_client(hostname, username=runtime.get_username(), password=runtime.get_password()) as ssh: File /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py, line 17, in __enter__ return self.gen.next() File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/zopkio/remote_host_helper.py, line 204, in get_ssh_client ssh.connect(hostname, username=username, password=password) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py, line 251, in connect retry_on_signal(lambda: sock.connect(addr)) File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/util.py, line 270, in retry_on_signal return function() File /tmp/roger/samza-integration-tests/lib/python2.7/site-packages/paramiko/client.py, line 251, in lambda retry_on_signal(lambda: sock.connect(addr)) File /usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py, line 224, in meth return getattr(self._sock,name)(*args) error: [Errno 61] Connection refused
Re: How do you serve the data computed by Samza?
Ah, thanks for the great explanation. Any particular reason that the job(s) you described should not be Samza jobs? We're started experimenting with such jobs for Druid and Elasticsearch. For Elasticsearch, the Samza job containers join the Elasticsearch cluster as transport nodes and use the Java API to push ES data nodes. Likewise for Druid, the Samza job uses the Tranquility API to schedule jobs ( https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza ). The nice part about push versus pull is that the downstream system does not need plugins (like ES rivers) that may complicate it's configuration or destabilize the system. Cheers, Roger On Tue, Mar 31, 2015 at 10:56 AM, Felix GV fville...@linkedin.com.invalid wrote: Thanks for your reply Roger! Very insightful (: 6. If there was a highly-optimized and reliable way of ingesting partitioned streams quickly into your online serving system, would that help you leverage Samza more effectively? 6. Can you elaborate please? Sure. The feature set I have in mind is the following: * Provide a thinly-wrapped Kafka producer which does appropriate partitioning and includes useful metadata (such as production timestamp, etc.) alongside the payload. This producer would be used in the last step of processing of a Samza topology, in order to emit to Kafka some processed/joined/enriched data which is destined for online serving. * Provide a consumer process which can be co-located on the same hosts as your data serving system. This process consumes from the appropriate partitions and checkpoints its offsets on its own. It leverages Kafka batching and compression to make consumption very efficient. * For each records the consumer process issues a put/insert locally to the co-located serving process. Since this is a local operation, it is also very cheap and efficient. * The consumer process can also optionally throttle its insertion rate by monitoring some performance metrics of the co-located data serving process. For example, if the data serving process exposes a p99 latency via JMX or other means, this can be used in a tight feedback loop to back off if read latency degrades beyond a certain threshold. * This ingestion platform should be easy to integrate with any consistently-routed data serving system, by implementing some simple interfaces to let the ingestion system understand the key-to-partition assignment strategy, as well as the partition-to-node assignment strategy. Optionally, a hook to access performance metrics could also be implemented if throttling is deemed important (as described in the previous point). * Since the consumer process lives in a separate process, the system benefits from good isolation guarantees. The consumer process can be capped to a low amount of heap, and its GC is inconsequential for the serving platform. It's also possible to bounce the consumer and data serving processes independently of each other, if need be. There are some more nuances and additional features which could be nice to have, but that's the general idea. It seems to me like such system would be valuable, but I'm wondering what other people in the open-source community think, hence why I was interested in starting this thread... Thanks for your feedback! -F
Re: [VOTE] Apache Samza 0.9.0 RC0
Nice. Thanks Yan! On Tue, Mar 31, 2015 at 3:24 PM, Yan Fang yanfang...@gmail.com wrote: Cool. * Published to maven, it's already there. * Uploaded to dist/release. It may take a while for mirrors to pick it up. * Updated the downloading page in https://issues.apache.org/jira/browse/SAMZA-624 ** Will publish the website after mirrors pick up the 0.9.0 release ** In terms of the blog, seem not have the access ? Thanks, Fang, Yan yanfang...@gmail.com On Tue, Mar 31, 2015 at 1:36 PM, Chris Riccomini criccom...@apache.org wrote: Hey Yan/Jakob, Awesome, thanks! Yan, feel free to finish up the release. :) Very cool! Cheers, Chris On Tue, Mar 31, 2015 at 1:27 PM, Jakob Homan jgho...@gmail.com wrote: Correct. All that's necessary for a release is a more-+1s-than--1s-from-PMC-members vote, and then we can go ahead with distribution, publicity, etc. -jg On 31 March 2015 at 12:44, Chris Riccomini criccom...@apache.org wrote: Hey Yan, Let's confirm with Jakob. I *think* we don't need any intervention from Apache. We should be able to move forward with the release. @Jakob, can you confirm this? Cheers, Chris On Tue, Mar 31, 2015 at 11:17 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, After 72+ hours, we got +4 binding votes (Chris, Jakob, Chinmay, Yan) , +2 non-binding votes (Roger, Yi Pan). The release vote passes. @Chris, Do we need the vote from apache general mailing list? Or I can go ahead to update to release dist, update download page, publish 0.8.0 binaries to Maven and write 0.8.0 blog post? Thanks, Fang, Yan yanfang...@gmail.com On Tue, Mar 31, 2015 at 9:21 AM, Ash W Matheson ash.mathe...@gmail.com wrote: Of say yes, is been a few days with little traffic on the topic. On Mar 31, 2015 9:18 AM, Chris Riccomini criccom...@apache.org wrote: Hey all, Is the vote done? Cheers, Chris On Mon, Mar 30, 2015 at 2:10 PM, Chris Riccomini criccom...@apache.org wrote: +1 1. Validated hello-samza works with 0.9.0 Maven binaries. 2. Validated release-0.9.0-rc0 tag exists and has correct checksums. 3. Validated source release tarball builds, and has correct licenses (bin/check-all.sh). 4. Validated source release tarball validates against Yan's PGP key. 5. Ran rolling bounce of Kafka cluster with large job (1 million+ messages/sec) 6. Ran Zopkio integration tests, and SAMZA-394 torture test. For (6), I ran the SAMZA-394 tests for 72 hours with torture test running. No consistency/data loss issues! I did find an issue with the checker integration test, but I think it's best left for 0.10.0, so I'll open a JIRA to track that. On Mon, Mar 30, 2015 at 10:49 AM, Roger Hoover roger.hoo...@gmail.com wrote: +1 * Created and tested an sample job doing a join * Build packages * Couldn't get integration tests to work. It seemed like it was timing out trying to download dependencies. I may have been having network issues last night. Cheers, Roger On Sun, Mar 29, 2015 at 9:09 PM, Jakob Homan jgho...@gmail.com wrote: +1 (binding) * Verified sig and checksum * Spot checked files, verified license and notice * Built packages * Ran hello samza Good work, Yan. -jg On 29 March 2015 at 13:08, Chinmay Soman chinmay.cere...@gmail.com wrote: +1 * Verified signature against the release * Verified authenticity of the key * Ran hello-samza (latest branch) - all 3 jobs succeed againt 0.9.0 release * Ran integration tests - all 3 tests pass (after addresing SAMZA-621. Also, once a failure occurs - I have to manually kill the Yarn daemons. Not sure if there's a ticket open for that - a quick search did not reveal anything). Good job guys ! On Sat, Mar 28, 2015 at 1:36 AM, Yan Fang yanfang...@gmail.com wrote: Hi Chris and Jakob, Sure. Let's do the voting until Monday. Hope more guys have time to try and validate the 0.9.0 version. Thanks, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Mar 27, 2015 at 5:09 PM, Chris Riccomini criccom...@apache.org wrote: Hey Yan, Yea, could we delay until Monday? I have been doing a lot of burn-in, and have found some issues
Re: How do you serve the data computed by Samza?
Hi Felix, 1,3. We're experimenting with both Druid and Elasticsearch for this. We're using Samza to enrich user activity and system performance events then index them in Druid +/or Elasticsearch depending on the use case. 2. These are internal BI/Operations applications 4. We're still getting up to speed on both Druid and Elastisearch to get the necessary write throughput. Read throughput has not been an issue. 5. Not yet but don't expect it will easy 6. Can you elaborate please? Cheers, Roger On Fri, Mar 27, 2015 at 9:52 AM, Felix GV fville...@linkedin.com.invalid wrote: Hi Samza devs, users and enthusiasts, I've kept an eye on the Samza project for a while and I think it's super cool! I hope it continues to mature and expand as it seems very promising (: One thing I've been wondering for a while is: how do people serve the data they computed on Samza? More specifically: 1. How do you expose the output of Samza jobs to online applications that need low-latency reads? 2. Are these online apps mostly internal (i.e.: analytics, dashboards, etc.) or public/user-facing? 3. What systems do you currently use (or plan to use in the short-term) to host the data generated in Samza? HBase? Cassandra? MySQL? Druid? Others? 4. Are you satisfied or are you facing challenges in terms of the write throughput supported by these storage/serving systems? What about read throughput? 5. Are there situations where you wish to re-process all historical data when making improvements to your Samza job, which results in the need to re-ingest all of the Samza output into your online serving system (as described in the Kappa Architecture http://radar.oreilly.com/2014/07/questioning-the-lambda-architecture.html) ? Is this easy breezy or painful? Do you need to throttle it lest your serving system will fall over? 6. If there was a highly-optimized and reliable way of ingesting partitioned streams quickly into your online serving system, would that help you leverage Samza more effectively? Your insights would be much appreciated! Thanks (: -- Felix
Re: Re-processing a la Kappa/Liquid
Thanks, Jay. This is one of the really nice advantages of local state in my mind. Full retention would work but eventually run out of space, right? Ideally, Kafka would guarantee to keep dirty keys for a configurable amount of time as Chris suggested. Sent from my iPhone On Feb 21, 2015, at 10:10 AM, Jay Kreps jay.kr...@gmail.com wrote: Gotcha. Yes if you want to be able to join to past versions you definitely can't turn on compaction as the whole goal of that feature is to delete past versions. But wouldn't it work to use full retention if you want that (and use the MessageChooser interface during reprocessing if you want tight control over the state recreation). I mean you have the same dilemma if you don't use local state but instead use a remote store--the remote store likely only keeps the last version of each value so you can't join to the past. -Jay On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover roger.hoo...@gmail.com wrote: Jay, Sorry, I didn't explain it very well. I'm talking about a stream-table join where the table comes from a compacted topic that is used to populate a local data store. As the stream events are processed, they are joined with dimension data from the local store. If you want to kick off another version of this job that starts back in time, the new job cannot reliably recreate the same state of the local store that the original had because old values may have been compacted away. Does that make sense? Roger On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Roger, I'm not sure if I understand the case you are describing. As Chris says we don't yet give you fined grained control over when history starts to disappear (though we designed with the intention of making that configurable later). However I'm not sure if you need that for the case you describe. Say you have a job J that takes inputs I1...IN and produces output O1...ON and in the process accumulates state in a topic S. I think the approach is to launch a J' (changed or improved in some way) that reprocesses I1...IN from the beginning of time (or some past point) into O1'...ON' and accumulates state in S'. So the state for J and the state for J' are totally independent. J' can't reuse J's state in general because the code that generates that state may have changed. -Jay On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover roger.hoo...@gmail.com wrote: Chris + Samza Devs, I was wondering whether Samza could support re-processing as described by the Kappa architecture or Liquid ( http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). It seems that a changelog is not sufficient to be able to restore state backward in time. Kafka compaction will guarantee that local state can be restored from where it left off but I don't see how it can restore past state. Imagine the case where a stream job has a lot of state in it's local store but it has not updated any keys in a long time. Time t1: All of the data would be in the tail of the Kafka log (past the cleaner point). Time t2: The job updates some keys. Now we're in a state where the next compaction will blow away the old values for those keys. Time t3: Compaction occurs and old values are discarded. Say we want to launch a re-processing job that would begin from t1. If we launch that job before t3, it will correctly restore it's state. However, if we launch the job after t3, it will be missing old values, right? Unless I'm misunderstanding something, the only way around this is to keep snapshots in addition to the changelog. Has there been any discussion of providing an option in Samza of taking RocksDB snapshots and persisting them to an object store or HDFS? Thanks, Roger
Re: Reprocessing and windowing
Hi Geoffry, You might find the Google Millwheel paper and recent talk relevant. That system supports windows based on event time as well as reprocessing. Sent from my iPhone On Feb 23, 2015, at 4:49 PM, Geoffry Sumter vit...@gmail.com wrote: Hey everyone, I've been thinking about reprocessing http://samza.apache.org/learn/documentation/0.7.0/jobs/reprocessing.html when my job has windowed state http://samza.apache.org/learn/documentation/0.7.0/container/state-management.html#windowed-aggregation and I have a few questions. Context: I have a series of physical sensors that stream partial scans of their surroundings over the course of ~5-10 minutes (at the end of 5-10 minutes its provided a full scan of its surroundings and starts over from the start). Each packet of data has a timestamp that we're just going to have to trust is 'close enough.' When processing in real-time it's very natural to window the data every 5 minutes (wall clock) and merge into a complete view of their collective surroundings. For our purposes, old data arriving 5 minutes late is no longer useful for many applications. Now, I'd love to be able to reprocess data, especially by increasing parallelism and processing quickly, but this seems incompatible with using wall-clock-based windowed state. I could base my windowing/binning on the timestamps provided by my input data, but then I have to be careful to handle cases where some of my data may be arbitrarily delayed and the possibility that one partition will get significantly ahead of other ones (less interesting surroundings and faster computations) which makes waiting for a majority of partitions to get to a certain point in time difficult. Does anyone have experience with windowing and reprocessing? Any literature recommendations? Thanks! Geoffry
Log error deploying on YARN [Samza 0.8.0]
Hi all, I'm new to YARN and trying to have YARN download the Samza job tarball ( https://samza.apache.org/learn/tutorials/0.8/run-in-multi-node-yarn.html). From the log, it seems that the download failed. I've tested that the file is available via curl. The error message is: org/apache/samza/util/Logging I appreciate any suggestions. Roger 2015-03-24 17:13:05,469 INFO [Socket Reader #1 for port 33749] ipc.Server (Server.java:saslProcess(1294)) - Auth successful for appattempt_1427226422217_0005_02 (auth:SIMPLE) 2015-03-24 17:13:05,473 INFO [IPC Server handler 15 on 33749] containermanager.ContainerManagerImpl (ContainerManagerImpl.java:startContainerInternal(572)) - Start request for container_1427226422217_0005_02_01 by user opintel 2015-03-24 17:13:05,473 INFO [IPC Server handler 15 on 33749] nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) - USER=opintel IP=10.53.152.54 OPERATION=Start Container Request TARGET=ContainerManageImpl RESULT=SUCCESS APPID=application_1427226422217_0005 CONTAINERID=container_1427226422217_0005_02_01 2015-03-24 17:13:05,473 INFO [AsyncDispatcher event handler] application.Application (ApplicationImpl.java:transition(296)) - Adding container_1427226422217_0005_02_01 to application application_1427226422217_0005 2015-03-24 17:13:05,474 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(884)) - Container container_1427226422217_0005_02_01 transitioned from NEW to LOCALIZING 2015-03-24 17:13:05,474 INFO [AsyncDispatcher event handler] containermanager.AuxServices (AuxServices.java:handle(175)) - Got event CONTAINER_INIT for appId application_1427226422217_0005 2015-03-24 17:13:05,475 INFO [AsyncDispatcher event handler] localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from INIT to DOWNLOADING 2015-03-24 17:13:05,475 INFO [AsyncDispatcher event handler] localizer.ResourceLocalizationService (ResourceLocalizationService.java:handle(596)) - Created localizer for container_1427226422217_0005_02_01 2015-03-24 17:13:05,480 INFO [LocalizerRunner for container_1427226422217_0005_02_01] localizer.ResourceLocalizationService (ResourceLocalizationService.java:writeCredentials(1029)) - Writing credentials to the nmPrivate file /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens. Credentials list: 2015-03-24 17:13:05,481 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:createUserCacheDirs(469)) - Initializing user opintel 2015-03-24 17:13:05,492 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:startLocalizer(103)) - Copying from /tmp/hadoop-opintel/nm-local-dir/nmPrivate/container_1427226422217_0005_02_01.tokens to /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005/container_1427226422217_0005_02_01.tokens 2015-03-24 17:13:05,492 INFO [LocalizerRunner for container_1427226422217_0005_02_01] nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:startLocalizer(105)) - CWD set to /tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005 = file:/tmp/hadoop-opintel/nm-local-dir/usercache/opintel/appcache/application_1427226422217_0005 2015-03-24 17:13:05,520 INFO [IPC Server handler 4 on 8040] localizer.ResourceLocalizationService (ResourceLocalizationService.java:update(928)) - DEBUG: FAILED { http:// somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null }, org/apache/samza/util/Logging 2015-03-24 17:13:05,520 INFO [IPC Server handler 4 on 8040] localizer.LocalizedResource (LocalizedResource.java:handle(196)) - Resource http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz transitioned from DOWNLOADING to FAILED 2015-03-24 17:13:05,520 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(884)) - Container container_1427226422217_0005_02_01 transitioned from LOCALIZING to LOCALIZATION_FAILED 2015-03-24 17:13:05,521 INFO [AsyncDispatcher event handler] localizer.LocalResourcesTrackerImpl (LocalResourcesTrackerImpl.java:handle(137)) - Container container_1427226422217_0005_02_01 sent RELEASE event on a resource request { http://somehost.fake.com/samza/web-log-0.0.1-dist.tar.gz, 0, ARCHIVE, null } not present in cache. 2015-03-24 17:13:05,521 WARN [LocalizerRunner for container_1427226422217_0005_02_01] ipc.Client (Client.java:call(1388)) - interrupted waiting to send rpc request to server java.lang.InterruptedException at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400) at java.util.concurrent.FutureTask.get(FutureTask.java:187) at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1029) at
Re: Kafka topic naming conventions
Renato, Thanks for the link. Some interesting suggests there as well. On Thu, Mar 19, 2015 at 2:28 AM, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: There was an interesting discussion over in the kafka mailing list that might give you more ideas Roger. Although they don't mention anything about the number of partitions when doing so, anyways maybe it helps. Renato M. [1] https://www.mail-archive.com/users@kafka.apache.org/msg11976.html 2015-03-19 5:43 GMT+01:00 Roger Hoover roger.hoo...@gmail.com: Thanks, guys. I was also playing around with including partition count and even the partition key in the topic name. My thought was that topics may have the same data and number of partitions but only differ by partition key. After a while, the naming does get crazy (too long and ugly). We really need a topic metatdata store. On Wed, Mar 18, 2015 at 6:21 PM, Chinmay Soman chinmay.cere...@gmail.com wrote: Yeah ! It does seem a bit hackish - but I think this approach promises less config/operation errors. Although I think some of these checks can be built within Samza - assuming Kafka has a metadata store in the near future - the Samza container can validate the #topics against this store. On Wed, Mar 18, 2015 at 6:16 PM, Chris Riccomini criccom...@apache.org wrote: Hey Chinmay, Cool, this is good feedback. I didn't think I was *that* crazy. :) Cheers, Chris On Wed, Mar 18, 2015 at 6:10 PM, Chinmay Soman chinmay.cere...@gmail.com wrote: Thats what we're doing as well - appending partition count to the kafka topic name. This actually helps keep track of the #partitions for each topic (since Kafka doesn't have a Metadata store yet). In case of topic expansion - we actually just resort to creating a new topic. Although that is an overhead - the thought process is that this will minimize operational errors. Also, this is necessary to do in case we're doing some kind of joins. On Wed, Mar 18, 2015 at 5:59 PM, Jakob Homan jgho...@gmail.com wrote: On 18 March 2015 at 17:48, Chris Riccomini criccom...@apache.org wrote: One thing I haven't seen, but might be relevant, is including partition counts in the topic. Yeah, but then if you change the partition count later on, you've got incorrect information forever. Or you need to create a new stream, which might be a nice forcing function to make sure your join isn't screwed up. There'd need to be something somewhere to enforce that though. -- Thanks and regards Chinmay Soman -- Thanks and regards Chinmay Soman
Re: How do you serve the data computed by Samza?
Is it because the Kafka partitioning might not be the same as the storage partitioning? So that a slow storage shard will prevent unrelated shards from getting their messages? Ah, I think I see what you mean. If so, then the solution is to make the Kafka partitioning match the storage partitioning. If that case, push or pull is the same, yeah? Thanks, Roger On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover roger.hoo...@gmail.com wrote: Chinmay, Thanks for your input. I'm not understanding what the difference is. With the design that Felix laid out, the co-located Kafka consumer is still doing a push to the storage system, right?. It just happens to be on the same machine. How is this different from pushing batches from a non-local Samza job? How does the pull-based approach you're thinking of deal with feedback and SLAs? Thanks, Roger On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman chinmay.cere...@gmail.com wrote: My 2 cents = One thing to note about the push model : multi-tenancy When your storage system (Druid for example) is used in a multi-tenant fashion - then push model is a bit difficult to operate. Primarily because there is no real feedback loop from the storage system. Yes - if the storage system starts doing bad - then you get timeouts and higher latencies - but then you're already in a position where you're probably breaking SLAs (for some tenant). In that sense, a pull model might be better since the consumer can potentially have more visibility into how this particular node is doing. Also, with the Kafka consumer batches things up - so theoretically - you could get similar throughput. Downside of this approach is of course - the storage system partitioning scheme *has to* line up with the Kafka partitioning scheme. On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover roger.hoo...@gmail.com wrote: Felix, I see your point about simple Kafka consumers. My thought was that if you're already managing a Samza/YARN deployment then these types of jobs would be just another job and not require an additional process management/monitoring/operations setup. If you've already got a way to handle vanilla Kafka jobs then it makes sense. For the push model, the way we're planning to deal with the latency of round-trip calls is to batch up pushs to the downstream system. Both Druid Tranquility and the ES transport node protocol allow you to batch index requests. I'm curious if pull would be that much more efficient. Cheers, Roger On Wed, Apr 1, 2015 at 10:26 AM, Felix GV fville...@linkedin.com.invalid wrote: Hi Roger, You bring up good points, and I think the short answer is that there are trade-offs to everything, of course (: What I described could definitely be implemented as a Samza job, and I think that would make a lot of sense if the data serving system was also deployed via YARN. This way, the Samza tasks responsible for ingesting and populating the data serving system's nodes could be spawned wherever YARN knows these nodes are located. For data serving systems not well integrated with YARN however, I'm not sure that there would be that much win in using the Samza deployment model. And since the consumers themselves are pretty simple (no joining of streams, no local state, etc.), this seems to be a case where Samza is a bit overkill and a regular Kafka consumer is perfectly fine (except for the YARN-enabled auto-deployment aspect, like I mentioned). As for push versus pull, I think the trade-off is the following: push is mostly simpler and more decoupled, as you said, but I think pull would be more efficient. The reason for that is that Kafka consumption is very efficient (thanks to batching and compression), but most data serving systems don't provide a streaming ingest API for pushing data efficiently to them, instead they have single record put/insert APIs which require a round-trip to be acknowledged. This is perfectly fine in low-throughput scenarios, but does not support very high throughput of ingestion like Kafka can provide. By co-locating the pulling process (i.e.: Kafka consumer) with the data serving node, it makes it a bit more affordable to do single puts since the (local) round-trip acks would be near-instantaneous. Pulling also makes the tracking of offsets across different nodes a bit easier, since each node can consume at its own pace, and resume at whatever point in the past it needs (i.e.: rewind) without affecting the other replicas. Tracking offsets across many replicas in the push model is a bit more annoying, though still doable, of course. -- Felix GV Data Infrastructure Engineer Distributed Data Systems LinkedIn f...@linkedin.com linkedin.com/in/felixgv From: Roger Hoover [roger.hoo
Re: Joining Avro records
Hi all, In case this helps anyone, I was able to create a simple class to do the join and it works nicely for my use case. It assumes you have schema for the input and output records. Example ( https://github.com/Quantiply/rico/blob/master/avro-serde/src/test/java/com/quantiply/avro/JoinTest.java#L44-L52 ): GenericRecord in1 = getIn1(); GenericRecord in2 = getIn2(); GenericRecord joined = new Join(getJoinedSchema()) .merge(in1) .merge(in2) .getBuilder() .set(charlie, blah blah) .build(); Class is here: https://github.com/Quantiply/rico/blob/master/avro-serde/src/main/java/com/quantiply/avro/Join.java Cheers, Roger On Thu, Apr 9, 2015 at 12:54 PM, Roger Hoover roger.hoo...@gmail.com wrote: Yi Pan, Thanks for your response. I'm thinking that I'll iterate over the fields of the input schemas (similar to this https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62), match them up with the output schema and then copy the values. It'll let you know how it goes in case it's useful. Cheers, Roger On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Roger, Good question on that. I am actually not aware of any automatic way of doing this in Avro. I have tried to add generic Schema and Data interface in samza-sql branch to address the morphing of the schemas from input streams to the output streams. The basic idea is to have wrapper Schema and Data classes on-top-of the deserialized objects to access the data fields according to the schema w/o changing and copying the actual data fields. Hence, when there is a need to morph the input data schemas into a new output data schema, we just need an implementation of the new output data Schema class that can read the corresponding data fields from the input data and write them out in the output schema. An interface function transform() is added in the Schema class for this exact purpose. Currently, it only takes one input data and one example of projection transformation can be found in the implementation of AvroSchema class. A join case as you presented may well be a reason to have an implementation of join with multiple input data. All the above solution is still experimental and please feel free to provide your feedback and comments on that. If we agree that this solution is good and suit for a broader use case, it can be considered to be used outside the SQL context as well. Best regards! -Yi On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Milinda and others, This is an Avro question but since you guys are working on Avro support for stream SQL, I thought I'd ask you for help. If I have a two records of type A and B as below and want to join them similar to SELECT * in SQL to produce a record of type AB, is there an simple way to do this with Avro without writing code to copy each field individually? I appreciate any help. Thanks, Roger { name: A, type: record, namespace: fubar, fields: [{name: a, type : int}] } { name: B, type: record, namespace: fubar, fields: [{name: b, type : int}] } { name: AB, type: record, namespace: fubar, fields: [{name: a, type : int}, {name: b, type : int}] }
Re: Errors and hung job on broker shutdown
At error level logging, this was the only entry in the Samza log: 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2] ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395] Unable to send message from TaskName-Partition 1 to system kafka Here is the log from the Kafka broker that was shutdown. http://pastebin.com/afgmLyNF Thanks, Roger On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan nickpa...@gmail.com wrote: Roger, could you paste the full log from Samza container? If you can figure out which Kafka broker the message was sent to, it would be helpful if we get the log from the broker as well. On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, I need some help figuring out what's going on. I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN. All the topics have replication factor of 2. I'm bouncing the Kafka broker using SIGTERM (with controlled.shutdown.enable=true). I see the Samza job log this message and then hang (does not exit nor does it make any progress). 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2] ssp[kafka,my-topic,2] offset[9129395] Unable to send message from TaskName-Partition 1 to system kafka The Kafka consumer (Druid Real-Time node) on the other side then barfs on the message: Exception in thread chief-svc-perf kafka.message.InvalidMessageException: Message is corrupt (stored crc = 1792882425, computed crc = 3898271689) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106) at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234) My questions are: 1) What is the right way to bounce a Kafka broker? 2) Is this a bug in Samza that the job hangs after producer request fails? Has anyone seen this? Thanks, Roger
Errors and hung job on broker shutdown
Hi, I need some help figuring out what's going on. I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN. All the topics have replication factor of 2. I'm bouncing the Kafka broker using SIGTERM (with controlled.shutdown.enable=true). I see the Samza job log this message and then hang (does not exit nor does it make any progress). 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2] ssp[kafka,my-topic,2] offset[9129395] Unable to send message from TaskName-Partition 1 to system kafka The Kafka consumer (Druid Real-Time node) on the other side then barfs on the message: Exception in thread chief-svc-perf kafka.message.InvalidMessageException: Message is corrupt (stored crc = 1792882425, computed crc = 3898271689) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106) at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234) My questions are: 1) What is the right way to bounce a Kafka broker? 2) Is this a bug in Samza that the job hangs after producer request fails? Has anyone seen this? Thanks, Roger
Re: Errors and hung job on broker shutdown
Guozhang and Yan, Thank you both for your responses. I tried a lot of combinations and I think I've determined that it's new producer + snappy that causes the issue. It never happens with the old producer and it never happens with lz4 or no compression. It only happens when a broker gets restarted (or maybe just shutdown). The error is not always the same. I've noticed at least three types of errors on the Kafka brokers. 1) java.io.IOException: failed to read chunk at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356) http://pastebin.com/NZrrEHxU 2) java.lang.OutOfMemoryError: Java heap space at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346) http://pastebin.com/yuxk1BjY 3) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) http://pastebin.com/yq98Hx49 I've noticed a couple different behaviors from the Samza producer/job A) It goes into a long retry loop where this message is logged. I saw this with error #1 above. 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[253] Got error produce response with correlation id 4878 on topic-partition svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE B) The job exists with org.apache.kafka.common.errors.UnknownServerException (at least when run as ThreadJob). I saw this with error #3 above. org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 6 to system kafka. org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request This seems most likely to be a bug in the new Kafka producer. I'll probably file a JIRA for that project. Thanks, Roger On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang wangg...@gmail.com wrote: And just to answer your first question: SIGTERM with controlled.shutdown=true should be OK for bouncing the broker. Guozhang On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang wangg...@gmail.com wrote: Roger, I believe Samza 0.9.0 already uses the Java producer. Java producer's close() call will try to flush all buffered data to the brokers before completing the call. However, if some buffered data's destination partition leader is not known, the producer will block on refreshing the metadata and then retry sending. From the broker logs, it seems it does receive the producer request but failed to handle it due to Leader not local after the bounce: [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with correlation id 226 from client samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition [sys.samza_metrics,0] failed due to Leader not local for partition [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis) [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with correlation id 45671 from client samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed due to Leader not local for partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0 (kafka.server.KafkaApis) [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with correlation id 12267 from client samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition [sys.samza_metrics,0] failed due to Leader not local for partition [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis) because for these two topic-partitions (sys.samza_metrics,0 and __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has been moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets the error code from the old leader, it should refresh its metadata and get the new leader as broker-1, and retry sending, but for some reason it does not refresh its metadata. Without producer logs from Samza container I cannot further investigate the issue. Which Kafka version does Samza 0.9.0 use? Guozhang On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang yanfang...@gmail.com wrote: Not sure about the Kafka side. From the Samza side, from your description ( does not exit nor does it make any progress ), I think the code is stuck in producer.close https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143 , otherwise, it will throw SamzaException to quit the job. So maybe some Kafka experts in this mailing list or Kafka mailing list can help Fang, Yan yanfang...@gmail.com On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover roger.hoo...@gmail.com wrote: At error level logging, this was the only entry in the Samza log: 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2] ssp[kafka
Re: How to configure the Resource Manager endpoint for YARN?
I'll try that. Thanks, Chris. On Wed, Apr 15, 2015 at 9:37 AM, Chris Riccomini criccom...@apache.org wrote: Hey Roger, Not sure if this makes a difference, but have you tried using: export YARN_CONF_DIR=... Instead? This is what we use. Cheers, Chris On Wed, Apr 15, 2015 at 9:33 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, I'm trying to deploy a job to a small YARN cluster. How do tell the launcher script where to find the Resource Manager? I tried creating a yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it doesn't find my config. 2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM 0.0.0.0:8032 2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at / 0.0.0.0:8032 Thanks, Roger
How to configure the Resource Manager endpoint for YARN?
Hi, I'm trying to deploy a job to a small YARN cluster. How do tell the launcher script where to find the Resource Manager? I tried creating a yarn-site.xml and setting HADOOP_CONF_DIR environment variable but it doesn't find my config. 2015-04-14 22:02:45 ClientHelper [INFO] trying to connect to RM 0.0.0.0:8032 2015-04-14 22:02:45 RMProxy [INFO] Connecting to ResourceManager at / 0.0.0.0:8032 Thanks, Roger
Re: Joining Avro records
Thanks, Julian. Good point about needing aliasing for unique names in SQL. I didn't know about array_agg...nice. On Thu, Apr 9, 2015 at 12:35 PM, Julian Hyde jul...@hydromatic.net wrote: Much of this is about mapping from logical fields (i.e. the fields you can reference in SQL) down to the Avro representation; I’m no expert on that mapping, so I’ll focus on the SQL stuff. First, SQL doesn’t allow a record to have two fields of the same name, so you wouldn’t be allowed to have two “name” fields. When you do a join, you might need to alias output columns: select stream orders.id, products.id as productId from orders join products on orders.id = products.id; Second, JOIN isn’t the only SQL operator that combines records; GROUP BY also combines records. JOIN combines records from different streams, and they usually have different types (i.e. different numbers/types of fields), whereas GROUP BY combines records from the same stream. Use whichever best suits your purpose. select stream zipcode, floor(rowtime to hour), array_agg(orderid) as orderIds from orders group by zipcode, floor(rowtime to hour) (array_agg is an aggregate function, recently added to the SQL standard, that gathers input values into an array. See http://www.craigkerstiens.com/2013/04/17/array-agg/.) Output: { zipcode: “94705”, rowtime: “2015-04-09 11:00:00”, orderIds: [123, 156, 1056] }, { zipcode: “94117”, rowtime: “2015-04-09 11:00:00”, orderIds: [45, 777] }, { zipcode: “94705”, rowtime: “2015-04-09 12:00:00”, orderIds: [55] } Julian On Apr 9, 2015, at 12:07 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Roger, Good question on that. I am actually not aware of any automatic way of doing this in Avro. I have tried to add generic Schema and Data interface in samza-sql branch to address the morphing of the schemas from input streams to the output streams. The basic idea is to have wrapper Schema and Data classes on-top-of the deserialized objects to access the data fields according to the schema w/o changing and copying the actual data fields. Hence, when there is a need to morph the input data schemas into a new output data schema, we just need an implementation of the new output data Schema class that can read the corresponding data fields from the input data and write them out in the output schema. An interface function transform() is added in the Schema class for this exact purpose. Currently, it only takes one input data and one example of projection transformation can be found in the implementation of AvroSchema class. A join case as you presented may well be a reason to have an implementation of join with multiple input data. All the above solution is still experimental and please feel free to provide your feedback and comments on that. If we agree that this solution is good and suit for a broader use case, it can be considered to be used outside the SQL context as well. Best regards! -Yi On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Milinda and others, This is an Avro question but since you guys are working on Avro support for stream SQL, I thought I'd ask you for help. If I have a two records of type A and B as below and want to join them similar to SELECT * in SQL to produce a record of type AB, is there an simple way to do this with Avro without writing code to copy each field individually? I appreciate any help. Thanks, Roger { name: A, type: record, namespace: fubar, fields: [{name: a, type : int}] } { name: B, type: record, namespace: fubar, fields: [{name: b, type : int}] } { name: AB, type: record, namespace: fubar, fields: [{name: a, type : int}, {name: b, type : int}] }
Re: Joining Avro records
Yi Pan, Thanks for your response. I'm thinking that I'll iterate over the fields of the input schemas (similar to this https://github.com/apache/samza/blob/samza-sql/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java#L58-L62), match them up with the output schema and then copy the values. It'll let you know how it goes in case it's useful. Cheers, Roger On Thu, Apr 9, 2015 at 12:07 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Roger, Good question on that. I am actually not aware of any automatic way of doing this in Avro. I have tried to add generic Schema and Data interface in samza-sql branch to address the morphing of the schemas from input streams to the output streams. The basic idea is to have wrapper Schema and Data classes on-top-of the deserialized objects to access the data fields according to the schema w/o changing and copying the actual data fields. Hence, when there is a need to morph the input data schemas into a new output data schema, we just need an implementation of the new output data Schema class that can read the corresponding data fields from the input data and write them out in the output schema. An interface function transform() is added in the Schema class for this exact purpose. Currently, it only takes one input data and one example of projection transformation can be found in the implementation of AvroSchema class. A join case as you presented may well be a reason to have an implementation of join with multiple input data. All the above solution is still experimental and please feel free to provide your feedback and comments on that. If we agree that this solution is good and suit for a broader use case, it can be considered to be used outside the SQL context as well. Best regards! -Yi On Thu, Apr 9, 2015 at 8:55 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Milinda and others, This is an Avro question but since you guys are working on Avro support for stream SQL, I thought I'd ask you for help. If I have a two records of type A and B as below and want to join them similar to SELECT * in SQL to produce a record of type AB, is there an simple way to do this with Avro without writing code to copy each field individually? I appreciate any help. Thanks, Roger { name: A, type: record, namespace: fubar, fields: [{name: a, type : int}] } { name: B, type: record, namespace: fubar, fields: [{name: b, type : int}] } { name: AB, type: record, namespace: fubar, fields: [{name: a, type : int}, {name: b, type : int}] }
Joining Avro records
Hi Milinda and others, This is an Avro question but since you guys are working on Avro support for stream SQL, I thought I'd ask you for help. If I have a two records of type A and B as below and want to join them similar to SELECT * in SQL to produce a record of type AB, is there an simple way to do this with Avro without writing code to copy each field individually? I appreciate any help. Thanks, Roger { name: A, type: record, namespace: fubar, fields: [{name: a, type : int}] } { name: B, type: record, namespace: fubar, fields: [{name: b, type : int}] } { name: AB, type: record, namespace: fubar, fields: [{name: a, type : int}, {name: b, type : int}] }
Re: Newbie questions after completing Hello Samza about performance and project setup
Hi Warren, Yes, I think Hello Samza is the template project to work from. I believe that the slow message rate that you are seeing is because it's subscribed to the the wikipedia IRC stream which may only generate a few events per second. That said, some of the example configuration for the hello samza demo is not tuned for performance. In general, enabling compression can help a lot for jobs that are I/O bound. Enabling lz4 on JSON data, for example, shrinks it 10x. On the consumer side, setting task.consumer.batch.size might help. On the producer side, you might want to play around with these settings. systems.kafka.producer.compression.type systems.kafka.producer.batch.size systems.kafka.producer.linger.ms http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html http://kafka.apache.org/documentation.html#newproducerconfigs Cheers, Roger On Thu, Apr 9, 2015 at 1:14 AM, Warren Henning warren.henn...@gmail.com wrote: Hi, I ran the commands in http://samza.apache.org/startup/hello-samza/0.9/ successfully. Fascinating stuff! I was running all the processes on my (fairly recent model) Macbook Pro. One aspect I've heard about Kafka and Samza is performance -- handling thousands of messages a second. E.g., http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines talks about doing millions of writes a second. The rate at which the console emitted new messages seemed like a rate far slower than that -- maybe something on the order of 1-2 a second. I ran the commands and everything exactly as is listed on the tutorial page. Of course a laptop is vastly different from a production setup -- what kind of assumptions can you make about performance of Samza jobs in development mode? I realize it depends on what you're doing -- it's just very different from what I was expecting. Also, I'm not really sure about the best way to get started with writing my own Samza jobs. Is there a project template to work off of? Is the Hello Samza project it? Maybe import the Maven POM into a favorite IDE and rip out the Wikipedia-related classes? As someone who has written Java before but doesn't write it every day, it wasn't immediately clear to me. Apologies if these are addressed in blog posts/FAQs/documentation and I failed to research them adequately. Thanks! Warren
Re: How to configure log4j separately for the AM versus containers?
Ah, this seems to work. I saw the YarnJob.scala was referencing __package to launch to AM itself. yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd)/__package/lib/log4j-am.xml On Tue, Jun 23, 2015 at 12:40 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, I want the App Master to log at INFO level and the container to log at ERROR. Is there a way to configure the AM to use a different log4j config file? I'm trying to setting yarn.am.opts but ran couldn't get it to work with system properties. yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml give bad substitution error. This actually works it feels too ugly. yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd | cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print) What's the right way to do it? Thanks, Roger
How to configure log4j separately for the AM versus containers?
Hi, I want the App Master to log at INFO level and the container to log at ERROR. Is there a way to configure the AM to use a different log4j config file? I'm trying to setting yarn.am.opts but ran couldn't get it to work with system properties. yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://${user.dir}/lib/log4j-am.xml give bad substitution error. This actually works it feels too ugly. yarn.am.opts=-Xmx768m -XX:+UseSerialGC -Dlog4j.configuration=file://$(pwd | cut -d/ -f-9 | xargs -IXXX -n1 find XXX -name log4j-am.xml -print) What's the right way to do it? Thanks, Roger
Re: Samza hung after bootstrapping
Hi Yan, I've uploaded a file with TRACE level logging here: http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz I really appreciate your help as this is a critical issue for me. Thanks, Roger On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Roger, but it only spawns one container and still hangs after bootstrap -- this probably is due to your local machine does not have enough resource for the second container. Because I checked your log file, each container is about 4GB. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. -- Have you figure it out? I have a looked at your log and also the code. My suspect is that, there is a null enveloper somehow blocking the process. If you can paste the trace level log, it will be more helpful because many logs in chooser are trace level. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com wrote: I need some help. I have a job which bootstraps one stream and then is supposed to read from two. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. I ran it with the grid script on my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one container and still hangs after bootstrap. Debug logs are here: http://pastebin.com/af3KPvju I looked at JMX metrics and see: - Task Metrics - no value for kafka offset of non-bootstrapped stream - SystemConsumerMetrics - choose null keeps incrementing - ssps-needed-by-chooser 1 - unprocessed-messages 62k - Bootstrapping Chooser - lagging partitions 4 - laggin-batch-streams - 4 - batch-resets - 0 Has anyone seen this or can offer ideas of how to better debug it? I'm using Samza 0.9.0 and YARN 2.4.0. Thanks! Roger
Re: Samza hung after bootstrapping
I think I see what's happening. When there are 8 tasks and I set yarn.container.count=8, then each container is responsible for a single task. However, the systemStreamLagCounts map ( https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77) and laggingSystemStreamPartitions ( https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L83) are configured to track all partitions for the bootstrap topic rather than just the one partition assigned to this task. Later in the log, we see that the task/container completed bootstrap for it's own partition. 2015-06-21 12:28:55 org.apache.samza.system.chooser.BootstrappingChooser [DEBUG] Bootstrap stream partition is fully caught up: SystemStreamPartition [kafka, deploy.svc.tlrnsZOYQA6wrwAA4FLqZA, 0] but the Bootstrapping Chooser still thinks that the remaining partitions (assigned to other tasks in other containers) need to be completed. JMX at this point shows 7 lagging partitions of the 8 original partition count. I'm wondering why no one has run into this. Doesn't LinkedIn use partitioned bootstrapped topics? Thanks, Roger On Sun, Jun 21, 2015 at 12:22 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Yan, I've uploaded a file with TRACE level logging here: http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz I really appreciate your help as this is a critical issue for me. Thanks, Roger On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Roger, but it only spawns one container and still hangs after bootstrap -- this probably is due to your local machine does not have enough resource for the second container. Because I checked your log file, each container is about 4GB. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. -- Have you figure it out? I have a looked at your log and also the code. My suspect is that, there is a null enveloper somehow blocking the process. If you can paste the trace level log, it will be more helpful because many logs in chooser are trace level. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com wrote: I need some help. I have a job which bootstraps one stream and then is supposed to read from two. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. I ran it with the grid script on my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one container and still hangs after bootstrap. Debug logs are here: http://pastebin.com/af3KPvju I looked at JMX metrics and see: - Task Metrics - no value for kafka offset of non-bootstrapped stream - SystemConsumerMetrics - choose null keeps incrementing - ssps-needed-by-chooser 1 - unprocessed-messages 62k - Bootstrapping Chooser - lagging partitions 4 - laggin-batch-streams - 4 - batch-resets - 0 Has anyone seen this or can offer ideas of how to better debug it? I'm using Samza 0.9.0 and YARN 2.4.0. Thanks! Roger
Re: [VOTE] Apache Samza 0.9.1 RC0
Hi all, Do you think we could get this bootstrapping bug fixed before 0.9.1 release? It seems like a critical bug. https://issues.apache.org/jira/browse/SAMZA-720 Thanks, Roger On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang yanfang...@gmail.com wrote: Agree. I will test it this weekend. Thanks, Fang, Yan yanfang...@gmail.com On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang wangg...@gmail.com wrote: Since we only get one vote so far, I think I have to extend the vote deadline. Let's set it to next Monday 6pm. Please check the candidate and vote for your opinions. Guozhang On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan nickpa...@gmail.com wrote: +1. Ran the Samza failure test suite and succeeded over night. On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang wangg...@gmail.com wrote: Hey all, This is a call for a vote on a release of Apache Samza 0.9.1. This is a bug-fix release against 0.9.0. The release candidate can be downloaded from here: http://people.apache.org/~guozhang/samza-0.9.1-rc0/ The release candidate is signed with pgp key 911402D8, which is included in the repository's KEYS file: https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95 and can also be found on keyservers: http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8 The git tag is release-0.9.1-rc0 and signed with the same pgp key: https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e Test binaries have been published to Maven's staging repository, and are available here: 5 critical bugs were resolved for this release: https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29 The vote will be open for 72 hours ( end in 6:00pm Saturday, 06/20/2015 ). Please download the release candidate, check the hashes/signature, build it and test it, and then please vote: [ ] +1 approve [ ] +0 no opinion [ ] -1 disapprove (and reason why) -- Guozhang -- -- Guozhang
Re: Samza hung after bootstrapping
Thank you, Yan. I'll get a trace level log as soon as I can. Sent from my iPhone On Jun 19, 2015, at 12:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Roger, but it only spawns one container and still hangs after bootstrap -- this probably is due to your local machine does not have enough resource for the second container. Because I checked your log file, each container is about 4GB. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. -- Have you figure it out? I have a looked at your log and also the code. My suspect is that, there is a null enveloper somehow blocking the process. If you can paste the trace level log, it will be more helpful because many logs in chooser are trace level. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com wrote: I need some help. I have a job which bootstraps one stream and then is supposed to read from two. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. I ran it with the grid script on my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one container and still hangs after bootstrap. Debug logs are here: http://pastebin.com/af3KPvju I looked at JMX metrics and see: - Task Metrics - no value for kafka offset of non-bootstrapped stream - SystemConsumerMetrics - choose null keeps incrementing - ssps-needed-by-chooser 1 - unprocessed-messages 62k - Bootstrapping Chooser - lagging partitions 4 - laggin-batch-streams - 4 - batch-resets - 0 Has anyone seen this or can offer ideas of how to better debug it? I'm using Samza 0.9.0 and YARN 2.4.0. Thanks! Roger
Samza hung after bootstrapping
I need some help. I have a job which bootstraps one stream and then is supposed to read from two. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. I ran it with the grid script on my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one container and still hangs after bootstrap. Debug logs are here: http://pastebin.com/af3KPvju I looked at JMX metrics and see: - Task Metrics - no value for kafka offset of non-bootstrapped stream - SystemConsumerMetrics - choose null keeps incrementing - ssps-needed-by-chooser 1 - unprocessed-messages 62k - Bootstrapping Chooser - lagging partitions 4 - laggin-batch-streams - 4 - batch-resets - 0 Has anyone seen this or can offer ideas of how to better debug it? I'm using Samza 0.9.0 and YARN 2.4.0. Thanks! Roger
Re: [VOTE] Apache Samza 0.9.1 RC0
Yan, I tested to patch locally and it looks good. Creating a patched release for myself to test in our environment. Thanks, again. Sent from my iPhone On Jun 22, 2015, at 10:59 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Yan, Thanks a lot for the quick fix on the mentioned bugs. It seems the fix for SAMZA-720 is pretty localized and I am OK to push it into 0.9.1. I will be working on back porting those changes to 0.9.1 later today and fix all the release related issues. Thanks! -Yi On Mon, Jun 22, 2015 at 10:30 AM, Roger Hoover roger.hoo...@gmail.com wrote: Yan, You rock. Thank you so much for the quick fix. I'm working on building and testing the patch. Cheers, Roger On Mon, Jun 22, 2015 at 1:09 AM, Yan Fang yanfang...@gmail.com wrote: Hi guys, 1. I have the difficulty in building the 0.9.1 branch. I think this is mainly related to SAMZA-721 https://issues.apache.org/jira/browse/SAMZA-721. 2. Also, https://issues.apache.org/jira/browse/SAMZA-712 seems bothering people as well. 3. https://issues.apache.org/jira/browse/SAMZA-720 is a critical bug we need to fix. Have already attached a patch. 4. There is no maven staging link. Thanks, Fang, Yan yanfang...@gmail.com On Sun, Jun 21, 2015 at 1:53 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi all, Do you think we could get this bootstrapping bug fixed before 0.9.1 release? It seems like a critical bug. https://issues.apache.org/jira/browse/SAMZA-720 Thanks, Roger On Sat, Jun 20, 2015 at 10:38 PM, Yan Fang yanfang...@gmail.com wrote: Agree. I will test it this weekend. Thanks, Fang, Yan yanfang...@gmail.com On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang wangg...@gmail.com wrote: Since we only get one vote so far, I think I have to extend the vote deadline. Let's set it to next Monday 6pm. Please check the candidate and vote for your opinions. Guozhang On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan nickpa...@gmail.com wrote: +1. Ran the Samza failure test suite and succeeded over night. On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang wangg...@gmail.com wrote: Hey all, This is a call for a vote on a release of Apache Samza 0.9.1. This is a bug-fix release against 0.9.0. The release candidate can be downloaded from here: http://people.apache.org/~guozhang/samza-0.9.1-rc0/ The release candidate is signed with pgp key 911402D8, which is included in the repository's KEYS file: https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95 and can also be found on keyservers: http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8 The git tag is release-0.9.1-rc0 and signed with the same pgp key: https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e Test binaries have been published to Maven's staging repository, and are available here: 5 critical bugs were resolved for this release: https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29 The vote will be open for 72 hours ( end in 6:00pm Saturday, 06/20/2015 ). Please download the release candidate, check the hashes/signature, build it and test it, and then please vote: [ ] +1 approve [ ] +0 no opinion [ ] -1 disapprove (and reason why) -- Guozhang -- -- Guozhang
Re: Samza job throughput much lower than Kafka throughput
Oops. Sent too soon. I mean: producer.batch.size=262144 producer.linger.ms=5 producer.compression.type=lz4 On Thu, May 21, 2015 at 9:00 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi George, You might also try tweaking the producer settings. producer.batch.size=262144 producer.linger.ms=5 producer.compression.type: lz4 On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang wangg...@gmail.com wrote: Hi George, Is there any reason you need to set the following configs? systems.kafka.consumer.fetch.wait.max.ms= 1 This setting will basically disable long pooling of the consumer which will then busy fetching data from broker, which has a large impact on network latency especially when the consumer is already caught up with the Kafka broker. Also when you say it is slower than a program reading directly from Kafka. which consumer did your program use to read data from Kafka? Guozhang On Wed, May 20, 2015 at 5:01 PM, George Li g...@ca.ibm.com wrote: Hi Yi, Thanks for the reply. Below is my job config and code. When we run this job inside our dev docker container, which has zookeeper, broker, and yarn installed locally, its throughput is at least 50% higher than our cluster run's. Thanks, George Configuration: job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=container-performance # YARN yarn.container.count=1 yarn.container.memory.mb=2548 yarn.package.path={my package on hdfs} yarn.container.retry.count=0 yarn.am.container.memory.mb=2048 yarn.am.jmx.enabled=false # Task task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true task.class=samza.TestPerformanceTask task.inputs=kafka.throughput-test2 task.log.interval=100 task.checkpoint.factory = org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka task.checkpoint.replication.factor=1 # Kafka System (only used for coordinator stream in this test) systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.fetch.threshold=5 systems.kafka.consumer.zookeeper.connect= {zookeeper} systems.kafka.producer.bootstrap.servers={broker node} systems.kafka.consumer.auto.offset.reset=smallest systems.kafka.consumer.socket.receive.buffer.bytes= 220 systems.kafka.consumer.fetch.message.max.bytes= 110 systems.kafka.consumer.fetch.min.bytes= 1 systems.kafka.consumer.fetch.wait.max.ms= 1 #define coordinator system job.coordinator.system=kafka job.coordinator.replication.factor=1 systems.kafka.streams.throughput-test2.samza.reset.offset=true systems.kafka.streams.throughput-test2.samza.offset.default=oldest ~ Job's code. This is mostly a copy-paste of the one in the repository object TestPerformanceTask { // No thread safety is needed for these variables because they're mutated in // // the process method, which is single threaded. var messagesProcessed = 0 var startTime = 0L } class TestPerformanceTask extends StreamTask with InitableTask with Logging { import TestPerformanceTask._ /** ** How many messages to process before a log message is printed. * */ var logInterval = 1 /** ** How many messages to process before shutting down. * */ var maxMessages = 1000 var outputSystemStream: Option[SystemStream] = None def init(config: Config, context: TaskContext) { logInterval = config.getInt(task.log.interval, 1) maxMessages = config.getInt(task.max.messages, 1000) outputSystemStream = Option(config.get(task.outputs, null)).map(Util.getSystemStreamFromNames(_)) println(init!!) } def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { if (startTime == 0) { startTime = System.currentTimeMillis } if (outputSystemStream.isDefined) { collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, envelope.getKey, envelope.getMessage)) } messagesProcessed += 1 if (messagesProcessed % logInterval == 0) { val seconds = (System.currentTimeMillis - startTime) / 1000 println(Processed %s messages in %s seconds. format (messagesProcessed, seconds)) } if (messagesProcessed = maxMessages) { coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) } } } From: Yi Pan nickpa...@gmail.com To: dev@samza.apache.org, Date: 20/05/2015 05:03 PM Subject:Re: Samza job throughput much lower than Kafka throughput Hi, George, Could you share w/ us the code and configuration of your sample test job? Thanks! -Yi On Wed, May 20, 2015 at 1:19 PM
Re: Thoughts and obesrvations on Samza
Metamorphosis...nice. :) This has been a great discussion. As a user of Samza who's recently integrated it into a relatively large organization, I just want to add support to a few points already made. The biggest hurdles to adoption of Samza as it currently exists that I've experienced are: 1) YARN - YARN is overly complex in many environments where Puppet would do just fine but it was the only mechanism to get fault tolerance. 2) Configuration - I think I like the idea of configuring most of the job in code rather than config files. In general, I think the goal should be to make it harder to make mistakes, especially of the kind where the code expects something and the config doesn't match. The current config is quite intricate and error-prone. For example, the application logic may depend on bootstrapping a topic but rather than asserting that in the code, you have to rely on getting the config right. Likewise with serdes, the Java representations produced by various serdes (JSON, Avro, etc.) are not equivalent so you cannot just reconfigure a serde without changing the code. It would be nice for jobs to be able to assert what they expect from their input topics in terms of partitioning. This is getting a little off topic but I was even thinking about creating a Samza config linter that would sanity check a set of configs. Especially in organizations where config is managed by a different team than the application developer, it's very hard to get avoid config mistakes. 3) Java/Scala centric - for many teams (especially DevOps-type folks), the pain of the Java toolchain (maven, slow builds, weak command line support, configuration over convention) really inhibits productivity. As more and more high-quality clients become available for Kafka, I hope they'll follow Samza's model. Not sure how much it affects the proposals in this thread but please consider other languages in the ecosystem as well. From what I've heard, Spark has more Python users than Java/Scala. (FYI, we added a Jython wrapper for the Samza API https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza and are working on a Yeoman generator https://github.com/Quantiply/generator-rico for Jython/Samza projects to alleviate some of the pain) I also want to underscore Jay's point about improving the user experience. That's a very important factor for adoption. I think the goal should be to make Samza as easy to get started with as something like Logstash. Logstash is vastly inferior in terms of capabilities to Samza but it's easy to get started and that makes a big difference. Cheers, Roger On Tue, Jul 7, 2015 at 3:29 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Forgot to add. On the naming issues, Kafka Metamorphosis is a clear winner :) -- Gianmarco On 7 July 2015 at 13:26, Gianmarco De Francisci Morales g...@apache.org wrote: Hi, @Martin, thanks for you comments. Maybe I'm missing some important point, but I think coupling the releases is actually a *good* thing. To make an example, would it be better if the MR and HDFS components of Hadoop had different release schedules? Actually, keeping the discussion in a single place would make agreeing on releases (and backwards compatibility) much easier, as everybody would be responsible for the whole codebase. That said, I like the idea of absorbing samza-core as a sub-project, and leave the fancy stuff separate. It probably gives 90% of the benefits we have been discussing here. Cheers, -- Gianmarco On 7 July 2015 at 02:30, Jay Kreps jay.kr...@gmail.com wrote: Hey Martin, I agree coupling release schedules is a downside. Definitely we can try to solve some of the integration problems in Confluent Platform or in other distributions. But I think this ends up being really shallow. I guess I feel to really get a good user experience the two systems have to kind of feel like part of the same thing and you can't really add that in later--you can put both in the same downloadable tar file but it doesn't really give a very cohesive feeling. I agree that ultimately any of the project stuff is as much social and naming as anything else--theoretically two totally independent projects could work to tightly align. In practice this seems to be quite difficult though. For the frameworks--totally agree it would be good to maintain the framework support with the project. In some cases there may not be too much there since the integration gets lighter but I think whatever stubs you need should be included. So no I definitely wasn't trying to imply dropping support for these frameworks, just making the integration lighter by separating process management from partition management. You raise two good points we would have to figure out if we went down the alignment path: 1. With respect to the name, yeah I think the first question is
Re: [Discuss/Vote] upgrade to Yarn 2.6.0
We're using 2.4.0 in production. Are there any major incompatibilities to watch out for when upgrading to 2.6.0? Thanks, Roger On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536 https://issues.apache.org/jira/browse/SAMZA-536), because there are some bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature in Yarn 2.4.0 (SAMZA-750 https://issues.apache.org/jira/browse/SAMZA-750 ) . So we just want to make sure if any production users are still using Yarn 2.4.0 and do not plan to upgrade to 2.6.0+? If not further concern, I think we can go and upgrade to Yarn 2.6.0 in Samza 0.10.0 release. Thanks, Fang, Yan yanfang...@gmail.com
Re: [Discuss/Vote] upgrade to Yarn 2.6.0
Works for me. Sent from my iPhone On Aug 24, 2015, at 7:48 AM, Yan Fang yanfang...@gmail.com wrote: Hi Roger, If you have plan to upgrade to 2.6.0, and no other companies are using 2.4.0, I think we can upgrade to 2.6.0 yarn in 0.10.0. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Aug 20, 2015 at 4:48 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Selina, Samza 0.9.1 on YARN 2.6 is the proved working solution. Best, -Yi On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech swucaree...@gmail.com wrote: Hi, Yi: If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed? Sincerely, Selina On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Roger, In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7 now. I am not aware of any major issues in upgrading. I will let our team member Jon Bringhurst to chime in since he did all the upgrade and may have more insights. @Jon, could you help to comment on this? Thanks! -Yi On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover roger.hoo...@gmail.com wrote: We're using 2.4.0 in production. Are there any major incompatibilities to watch out for when upgrading to 2.6.0? Thanks, Roger On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536 https://issues.apache.org/jira/browse/SAMZA-536), because there are some bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature in Yarn 2.4.0 (SAMZA-750 https://issues.apache.org/jira/browse/SAMZA-750 ) . So we just want to make sure if any production users are still using Yarn 2.4.0 and do not plan to upgrade to 2.6.0+? If not further concern, I think we can go and upgrade to Yarn 2.6.0 in Samza 0.10.0 release. Thanks, Fang, Yan yanfang...@gmail.com
Re: Use one producer for both coordinator stream and users system?
Hi Yan, My (uneducated) guess is that the performance gains come from batching. I don't know if the new producer ever batches by destination broker. If not and it only batches by (broker,topic,partition) then I doubt that one vs two producers will affect performance as they send to different topics. Cheers, Roger On Tue, Aug 18, 2015 at 12:26 AM, Yan Fang yanfang...@gmail.com wrote: Hi Tao, First, one kafka producer has an i/o thread. (correct me if I am wrong). Second, after Samza 0.10.0, we have a coordinator stream, which stores the checkpoint, config and other locality information for auto-scaling, dynamic configuration, etc purpose. (See Samza-348 https://issues.apache.org/jira/browse/SAMZA-348). So we have a producer for this coordinator stream. Therefore, each contains will have at least two producers, one is for the coordinator stream, one is for the users system. My question is, can we use only one producer for both coordinator stream and the users system to have better performance? (from the doc, it may retrieve better performance.) Thanks, Fang, Yan yanfang...@gmail.com On Mon, Aug 17, 2015 at 9:49 PM, Tao Feng fengta...@gmail.com wrote: Hi Yan, Naive question: what do we need producer thread of coordinator stream for? Thanks, -Tao On Mon, Aug 17, 2015 at 2:09 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, I have this question because Kafka's doc http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html seems recommending having one producer shared by all threads (*The producer is thread safe and should generally be shared among all threads for best performance.*), while currently the coordinator stream is using a separate producer (usually, there are two producers(two producer threads) in each container: one is for the coordinator stream , one is for the real job) 1. Will having one producer shared by all thread really improve the performance? (haven't done the perf test myself. Guess Kafka has some proof). 2. if yes, should we go this way? Thanks, Fang, Yan yanfang...@gmail.com
Re: changelog compaction problem
You also may want to check if the cleaner thread in the broker is still alive (using jstack). I've run into this issue and used the fix mentioned in the ticket to get compaction working again. https://issues.apache.org/jira/browse/KAFKA-1641 I'd just like to mention that a possible workaround (depending on your situation in regard to keys) is to stop the broker, remove the cleaner offset checkpoint, and then start the broker again for each ISR member in serial to get the thread running again. Keep in mind that the cleaner will start from the beginning if you do this. On Wed, Jul 29, 2015 at 8:43 AM, Chinmay Soman chinmay.cere...@gmail.com wrote: Just curious, Can you double check if you have log compaction enabled on your Kafka brokers ? On Wed, Jul 29, 2015 at 8:30 AM, Vladimir Lebedev w...@fastmail.fm wrote: Hello, I have a problem with changelog in one of my samza jobs grows indefinitely. The job is quite simple, it reads messages from the input kafka topic, and either creates or updates a key in task-local samza store. Once in a minute the window method kicks-in, it iterates over all keys in the store and deletes some of them, selecting on the contents of their value. Message rate in input topic is about 3000 messages per second. The input topic is partitioned in 48 partitions. Average number of keys, kept in the store is more or less stable and do not exceed 1 keys per task. Average size of values is 50 bytes. So I expected that sum of all segments' size in kafka data directory for the job's changelog topic should not exceed 1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB (after 6 days running from scratch) and it is growing. I tried to change default segment size for changelog topic in kafka, and it worked a bit - instead of 500Mbyte segments I have now 50Mbyte segments, but it did not heal the indefinite data growth problem. Moreover, if I stop the job and start it again it cannot restart, it breaks right after reading all records from changelog topic. Did somebody have similar problem? How it could be resolved? Best regards, Vladimir -- Vladimir Lebedev w...@fastmail.fm -- Thanks and regards Chinmay Soman
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
On July 29, 2015, 8:42 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 116 https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116 Should we add a Samza specifc message, then add the whole exception? so it's more clear what the exception was from if the user doesn't know the code? `Logger.info(Failed to index message in ElasticSearch., e);` This would also be true for other log lines added. Good idea. Thanks. BTW, it didn't work like this: Logger.info(Failed to index message in ElasticSearch., itemResp.getFailure()) so I did this: LOGGER.error(Failed to index document in Elasticsearch: + itemResp.getFailureMessage()); - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/#review93413 --- On July 29, 2015, 6:22 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 29, 2015, 6:22 a.m.) Review request for samza and Dan Harvey. Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
Thanks, Yi! On Wed, Jul 29, 2015 at 12:16 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Roger, I am testing the patch now. Will update the JIRA soon. Thanks! -Yi On Wed, Jul 29, 2015 at 12:11 PM, Roger Hoover roger.hoo...@gmail.com wrote: Thank you, Dan. I think we're ready to merge. Can one of the Samza committers please take a look? On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey danharve...@gmail.com wrote: This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116 (Diff revision 6) public void register(final String source) { 116 LOGGER.info(itemResp.getFailureMessage()); Should we add a Samza specifc message, then add the whole exception? so it's more clear what the exception was from if the user doesn't know the code? Logger.info(Failed to index message in ElasticSearch., e); This would also be true for other log lines added. On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote: Good idea. Thanks. BTW, it didn't work like this: Logger.info(Failed to index message in ElasticSearch., itemResp.getFailure()) so I did this: LOGGER.error(Failed to index document in Elasticsearch: + itemResp.getFailureMessage()); On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote: This is what the messages look like 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index document in Elasticsearch: VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: version conflict, current [9], provided [5]] That looks fine! - Dan On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote: Review request for samza and Dan Harvey. By Roger Hoover. *Updated July 29, 2015, 6:24 p.m.* *Repository: * samza Description SAMZA-741 Add support for versioning to Elasticsearch System Producer Testing Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java (f61bd36) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java (e3b635b) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java (afe0eee) - samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java (980964f) - samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java (684d7f6) View Diff https://reviews.apache.org/r/36815/diff/
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
Thank you, Dan. I think we're ready to merge. Can one of the Samza committers please take a look? On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey danharve...@gmail.com wrote: This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116 (Diff revision 6) public void register(final String source) { 116 LOGGER.info(itemResp.getFailureMessage()); Should we add a Samza specifc message, then add the whole exception? so it's more clear what the exception was from if the user doesn't know the code? Logger.info(Failed to index message in ElasticSearch., e); This would also be true for other log lines added. On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote: Good idea. Thanks. BTW, it didn't work like this: Logger.info(Failed to index message in ElasticSearch., itemResp.getFailure()) so I did this: LOGGER.error(Failed to index document in Elasticsearch: + itemResp.getFailureMessage()); On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote: This is what the messages look like 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index document in Elasticsearch: VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: version conflict, current [9], provided [5]] That looks fine! - Dan On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote: Review request for samza and Dan Harvey. By Roger Hoover. *Updated July 29, 2015, 6:24 p.m.* *Repository: * samza Description SAMZA-741 Add support for versioning to Elasticsearch System Producer Testing Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java (f61bd36) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java (e3b635b) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java (afe0eee) - samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java (980964f) - samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java (684d7f6) View Diff https://reviews.apache.org/r/36815/diff/
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 115 https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line115 could switch these around so you've got getStatus().equals(RestStatus.CONFLICT), fewer nots. Ok. I reworked it to not have nots. :) On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 117 https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line117 I don't think the LOGGER check here is needed? Yeah, not necessary here. On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 118 https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line118 I think it's worth adding a metrics here too so we know how many conflicts occur. Then does the log message from elastic search fit in with the Sazma ones, and is easy to understand? Added them in the updateSuccessMetrics() method On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 124 https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line124 it is odd a method with the name updateSuccessMetrics returns the number of writes? could just leave the log line as it was? or it could compute the different in the metrics before and after calling updateSuccessMetrics() here? might make more sense? Leaving it as is seemed misleading since version conflicts are not successfully written. Before/after comparison seems a little messy so I moved the log line to the updateSuccessMetrics() method On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java, line 92 https://reviews.apache.org/r/36815/diff/3/?file=1023402#file1023402line92 I know I set this line before but reading the elasticsearch docs they recommend using `Requests.indexRequest(index).type(type)` Sounds good. On July 28, 2015, 7:36 a.m., Dan Harvey wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 139 https://reviews.apache.org/r/36815/diff/3/?file=1023400#file1023400line139 This already gets logged and thrown in flush(), is this to see the exception sooner? These are the individual exceptions per document. They're not being logged in the flush() method. Only batch-level errors are saved and logged in the flush. I wasn't seeing any of the document level errors in the log. - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/#review93249 --- On July 28, 2015, 6:15 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 28, 2015, 6:15 a.m.) Review request for samza and Dan Harvey. Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 149 https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149 Quick question: Is it guaranteed that there is no DeleteResponse here? It would be good to at least log a warn if we get an unexpected response here. Roger Hoover wrote: It is guaranteed that you will not get a DeleteResponse back because the producer currently only allows IndexRequests. In the furture, if it supports DeleteRequest then we should add a counter metric for deletes. Yi Pan (Data Infrastructure) wrote: @Roger, thanks for the explanation. My point is: it would be good to make the code detect and log unexpected response. Oh, I see. I just added a warning log to do that. - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/#review93395 --- On July 29, 2015, 6:22 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 29, 2015, 6:22 a.m.) Review request for samza and Dan Harvey. Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: [DISCUSS] Release 0.10.0
Thanks, Yi. I propose that we also include SAMZA-741 for Elasticsearch versioning support with the new ES producer. I think it's very close to being merged. Roger On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan nickpa...@gmail.com wrote: Hi, all, I want to start the discussion on the release schedule for 0.10.0. There are a few important features that we plan to release in 0.10.0 and I want to start this thread s.t. we can agree on what to include in 0.10.0 release. There are the following main features added in 0.10.0: - RocksDB TTL support - Add CoordinatorStream and disable CheckpointManager - Elasticsearch Producer - Host affinity And other 0.10.0 tickets: https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0 I propose to cut a 0.10.0 release after we get the following issues resolved: - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator stream - SAMZA-617: YARN host affinity in Samza Thoughts? Thanks! -Yi
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
On July 29, 2015, 5:47 a.m., Yi Pan (Data Infrastructure) wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 149 https://reviews.apache.org/r/36815/diff/4/?file=1024086#file1024086line149 Quick question: Is it guaranteed that there is no DeleteResponse here? It would be good to at least log a warn if we get an unexpected response here. It is guaranteed that you will not get a DeleteResponse back because the producer currently only allows IndexRequests. In the furture, if it supports DeleteRequest then we should add a counter metric for deletes. - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/#review93395 --- On July 29, 2015, 5:17 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 29, 2015, 5:17 a.m.) Review request for samza and Dan Harvey. Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 29, 2015, 6:22 a.m.) Review request for samza and Dan Harvey. Changes --- Making the invalid response type an error level log message Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs (updated) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 28, 2015, 6:13 a.m.) Review request for samza. Changes --- Ignoring version conflicts Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs (updated) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java f61bd36 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java e3b635b samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java 980964f samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java 684d7f6 Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Kafka broker error from samza producer
I forgot to mention that for me the error always happened after restarting a broker. Sent from my iPhone On Jul 23, 2015, at 4:25 PM, Jordan Shaw jor...@pubnub.com wrote: Hey Roger, I restarted the producer and the error went away on the broker. If it comes back I'll switch over to lz4. Thanks for the reply. -Jordan On Thu, Jul 23, 2015 at 9:32 AM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Jordan, I ran into a similiar issue when using snappy compression and the new producer. If you disable compression or switch to lz4 or gzip, does the issue go away? Cheers, Roger On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw jor...@pubnub.com wrote: Hey Everyone, I'm getting an: kafka.message.InvalidMessageException: Message found with corrupt size (0) in my kafka server.log here is the full stack trace: https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1. It indicates that the error is caused by a bad produce call from the samza producer any idea what could be causing this? Just about the only thing that I can find is maybe a issue with snappy or compression but I don't see a snappy call in the traceback. -- Jordan Shaw Full Stack Software Engineer PubNub Inc -- Jordan Shaw Full Stack Software Engineer PubNub Inc 1045 17th St San Francisco, CA 94107
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 22, 2015, 5 p.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs (updated) - samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java PRE-CREATION samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 8eac8ef samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java e63d62c Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 22, 2015, 4:07 a.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs (updated) - samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java PRE-CREATION samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 8eac8ef samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java e63d62c Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
On July 21, 2015, 5:42 p.m., Yan Fang wrote: samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, lines 36-37 https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36 though it works, prefer to use the def here, not only because it has leff overhead, but also keep all the methods consistent for better readability. What do you think? Roger Hoover wrote: Sounds good. I only baulked on it the first time because I'm not that skilled with Scala type decarations yet. :) I can make this work I take it back. It seems it [can't be done](http://www.scala-lang.org/old/node/5082) - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/#review92429 --- On July 21, 2015, 5:41 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 21, 2015, 5:41 a.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs - samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java PRE-CREATION samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 8eac8ef samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java e63d62c Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36815/ --- (Updated July 25, 2015, 4:48 p.m.) Review request for samza. Repository: samza Description --- SAMZA-741 Add support for versioning to Elasticsearch System Producer Diffs (updated) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java afe0eee Diff: https://reviews.apache.org/r/36815/diff/ Testing --- Refactored DefaultIndexRequestFactory to make it easier to subclass and customize to handle version and version_type parameters. Thanks, Roger Hoover
Re: Kafka broker error from samza producer
Hi Jordan, I ran into a similiar issue when using snappy compression and the new producer. If you disable compression or switch to lz4 or gzip, does the issue go away? Cheers, Roger On Wed, Jul 22, 2015 at 11:54 PM, Jordan Shaw jor...@pubnub.com wrote: Hey Everyone, I'm getting an: kafka.message.InvalidMessageException: Message found with corrupt size (0) in my kafka server.log here is the full stack trace: https://gist.github.com/jshaw86/516cf47b6fd7559e7dc1. It indicates that the error is caused by a bad produce call from the samza producer any idea what could be causing this? Just about the only thing that I can find is maybe a issue with snappy or compression but I don't see a snappy call in the traceback. -- Jordan Shaw Full Stack Software Engineer PubNub Inc
How to map document version to the Elasticsearch System Producer?
Hi Dan and Samza devs, I have a use case for which I need to set an external version on Elasticsearch documents. Versioning ( https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning) lets you prevent duplicate messages from temporarily overwriting new versions of a document with old ones. Currently, the Elasticsearch system producer does not support setting versions. Since Kafka/Samza don't have support for key/value headers in messages, I think the best approach is to embed metadata into the stream name. We can add a version and version_type as options to the stream name. These match up with Elasticsearch REST API ( https://www.elastic.co/blog/elasticsearch-versioning-support) {index-name}/{type-name}?version={version-id}version_type={version-type} I've created a JIRA (https://issues.apache.org/jira/browse/SAMZA-741). I'd appreciate your feedback. Thanks, Roger
Metrics for Elasticsearch System Producer
Hi all, I've started using the new Elasticsearch System Producer (many thanks, Dan!) and decided to add some metrics to it. The JIRA ticket and review request links are here: https://issues.apache.org/jira/browse/SAMZA-733 https://reviews.apache.org/r/36473/ Cheers, Roger
Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
On July 14, 2015, 9:44 p.m., Yan Fang wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java, line 24 https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24 can this class extends MetricsHelper? This can simplifies a little. I don't see how it simplifies things because I have to implement all the methods in the Scala trait. I'm having trouble getting the newGauge signatures to match. ``` public class ElasticsearchSystemProducerMetrics implements MetricsHelper { public final Counter bulkSendSuccess; public final Counter inserts; public final Counter updates; private final MetricsRegistry registry; private final String group; private final String systemName; public interface JFunctionR { R apply(); } public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) { group = this.getClass().getName(); this.registry = registry; this.systemName = systemName; bulkSendSuccess = newCounter(bulk-send-success); inserts = newCounter(docs-inserted); updates = newCounter(docs-updated); } @Override public Counter newCounter(String name) { return MetricsHelper$class.newCounter(this, name); } @Override public T GaugeT newGauge(String name, T value) { return MetricsHelper$class.newGauge(this, name, value); } @Override public T GaugeT newGauge(String name, JFunctionT value) { return null; } @Override public Timer newTimer(String name) { return MetricsHelper$class.newTimer(this, name); } @Override public String getPrefix() { return systemName + -; } @Override public MetricsRegistry registry() { return registry; } @Override public String group() { return group; } } ``` - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/#review91670 --- On July 14, 2015, 6:12 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 14, 2015, 6:12 a.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
On July 14, 2015, 9:44 p.m., Yan Fang wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java, line 152 https://reviews.apache.org/r/36473/diff/1/?file=1010787#file1010787line152 remove the space Sure thing. - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/#review91670 --- On July 14, 2015, 6:12 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 14, 2015, 6:12 a.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
On July 14, 2015, 9:44 p.m., Yan Fang wrote: samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java, line 24 https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24 can this class extends MetricsHelper? This can simplifies a little. Roger Hoover wrote: I don't see how it simplifies things because I have to implement all the methods in the Scala trait. I'm having trouble getting the newGauge signatures to match. ``` public class ElasticsearchSystemProducerMetrics implements MetricsHelper { public final Counter bulkSendSuccess; public final Counter inserts; public final Counter updates; private final MetricsRegistry registry; private final String group; private final String systemName; public interface JFunctionR { R apply(); } public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) { group = this.getClass().getName(); this.registry = registry; this.systemName = systemName; bulkSendSuccess = newCounter(bulk-send-success); inserts = newCounter(docs-inserted); updates = newCounter(docs-updated); } @Override public Counter newCounter(String name) { return MetricsHelper$class.newCounter(this, name); } @Override public T GaugeT newGauge(String name, T value) { return MetricsHelper$class.newGauge(this, name, value); } @Override public T GaugeT newGauge(String name, JFunctionT value) { return null; } @Override public Timer newTimer(String name) { return MetricsHelper$class.newTimer(this, name); } @Override public String getPrefix() { return systemName + -; } @Override public MetricsRegistry registry() { return registry; } @Override public String group() { return group; } } ``` We really only need counters for this class but have to figure out how to implement the Scala newGauge methods which are tricky. Would appreciate help if you know how to do it. - Roger --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/#review91670 --- On July 14, 2015, 6:12 a.m., Roger Hoover wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 14, 2015, 6:12 a.m.) Review request for samza. Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 21, 2015, 5:41 a.m.) Review request for samza. Changes --- Fixed tests and added base class for Java metrics Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs (updated) - samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java PRE-CREATION samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 8eac8ef samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java PRE-CREATION samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java e63d62c Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36473/ --- (Updated July 15, 2015, 4:45 a.m.) Review request for samza. Changes --- Removed extra space Repository: samza Description --- SAMZA-733 Add metrics to Elasticsearch System Producer Diffs (updated) - samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java a277b69 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java 7eb14a2 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java PRE-CREATION Diff: https://reviews.apache.org/r/36473/diff/ Testing --- Tested that metrics for Elasticsearch producer appear in JMX and the metrics stream and that the metrics correctly count how many Elasticsearch documents were created and indexed. Thanks, Roger Hoover
Snapshot metrics stop getting scheduled in an exception occurs
Hi Samza devs, I ran into an issue with Samza 0.9.1 where I had a serialization exception thrown in the MetricsSnapshotReporter. It's very hard to find because nothing is logged and the metrics just stop getting scheduled. Samza should catch all exceptions in that thread, log them, and suppress them rather than no longer scheduling snapshot metrics at all. JIRA filed here: https://issues.apache.org/jira/browse/SAMZA-801 Cheers, Roger
Re: Checkpoint tool not working
I tried it once with 0.9.1 and it didn't work for me either. I didn't have time to examine it more carefully at the time. Roger On Thu, Oct 29, 2015 at 10:05 PM, Lukas Steiblyswrote: > I'm using Samza 0.9.1. > > Lukas > > On 10/29/15, Yi Pan wrote: > > Hi, Lukas, > > > > Which version of checkpoint-tool are you using? > > > > -Yi > > > > On Thu, Oct 29, 2015 at 5:39 PM, Lukas Steiblys > > wrote: > > > >> Hello, > >> > >> I’m trying to write the checkpoints for a Samza task supplying these > >> arguments to the checkpoint tool: > >> > >> bin/checkpoint-tool.sh > >> --new-offsets=file:///checkpoints/client-metrics.properties > >> --config-path=file:///checkpoints/task.properties > >> > >> However, it doesn’t actually write the checkpoints and, instead, prints > >> out the current checkpoints. > >> > >> Is the tool currently broken? A sample of the client-metrics.properties > >> file: > >> > >> tasknames.Partition 13.systems.kafka.streams.Sessions.partitions.13 = > >> 723 > >> tasknames.Partition 14.systems.kafka.streams.Sessions.partitions.14 = > >> 14589258 > >> tasknames.Partition > >> 15.systems.kafka.streams.Sessions.partitions.15=10886881 > >> > >> Lukas > > >
Re: Samza and KStreams (KIP-28): LinkedIn's POV
Great. Thanks, Yi. On Mon, Oct 5, 2015 at 10:25 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Roger, > > > On Sat, Oct 3, 2015 at 11:13 AM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > As previously discussed, the biggest request I > > have is being able to run Samza without YARN, under something like > > Kubernetes instead. > > > > > Totally. We will be actively working on the standalone Samza after the > upcoming 0.10 release. > > > > Also, I'm curious. What's the current state of the Samza SQL physical > > operators? Are they used in production yet? Is there a doc on how to > use > > them? > > > > > The current physical operators code now lives in samza-sql branch. There > are still two big pending check-ins in-review right now, one to stabilize > the operator's APIs, the other to implement the window operator. We are > planning to finish the proto-type in Q4. > > Regards, > > -Yi >
Re: Thoughts and obesrvations on Samza
That would be great to let Kafka do as much heavy lifting as possible and make it easier for other languages to implement Samza apis. One thing to watch out for is the interplay between Kafka's group management and the external scheduler/process manager's fault tolerance. If a container dies, the Kafka group membership protocol will try to assign it's tasks to other containers while at the same time the process manager is trying to relaunch the container. Without some consideration for this (like a configurable amount of time to wait before Kafka alters the group membership), there may be thrashing going on which is especially bad for containers with large amounts of local state. Someone else pointed this out already but I thought it might be worth calling out again. Cheers, Roger On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps j...@confluent.io wrote: Hey Roger, I couldn't agree more. We spent a bunch of time talking to people and that is exactly the stuff we heard time and again. What makes it hard, of course, is that there is some tension between compatibility with what's there now and making things better for new users. I also strongly agree with the importance of multi-language support. We are talking now about Java, but for application development use cases people want to work in whatever language they are using elsewhere. I think moving to a model where Kafka itself does the group membership, lifecycle control, and partition assignment has the advantage of putting all that complex stuff behind a clean api that the clients are already going to be implementing for their consumer, so the added functionality for stream processing beyond a consumer becomes very minor. -Jay On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover roger.hoo...@gmail.com wrote: Metamorphosis...nice. :) This has been a great discussion. As a user of Samza who's recently integrated it into a relatively large organization, I just want to add support to a few points already made. The biggest hurdles to adoption of Samza as it currently exists that I've experienced are: 1) YARN - YARN is overly complex in many environments where Puppet would do just fine but it was the only mechanism to get fault tolerance. 2) Configuration - I think I like the idea of configuring most of the job in code rather than config files. In general, I think the goal should be to make it harder to make mistakes, especially of the kind where the code expects something and the config doesn't match. The current config is quite intricate and error-prone. For example, the application logic may depend on bootstrapping a topic but rather than asserting that in the code, you have to rely on getting the config right. Likewise with serdes, the Java representations produced by various serdes (JSON, Avro, etc.) are not equivalent so you cannot just reconfigure a serde without changing the code. It would be nice for jobs to be able to assert what they expect from their input topics in terms of partitioning. This is getting a little off topic but I was even thinking about creating a Samza config linter that would sanity check a set of configs. Especially in organizations where config is managed by a different team than the application developer, it's very hard to get avoid config mistakes. 3) Java/Scala centric - for many teams (especially DevOps-type folks), the pain of the Java toolchain (maven, slow builds, weak command line support, configuration over convention) really inhibits productivity. As more and more high-quality clients become available for Kafka, I hope they'll follow Samza's model. Not sure how much it affects the proposals in this thread but please consider other languages in the ecosystem as well. From what I've heard, Spark has more Python users than Java/Scala. (FYI, we added a Jython wrapper for the Samza API https://github.com/Quantiply/rico/tree/master/jython/src/main/java/com/quantiply/samza and are working on a Yeoman generator https://github.com/Quantiply/generator-rico for Jython/Samza projects to alleviate some of the pain) I also want to underscore Jay's point about improving the user experience. That's a very important factor for adoption. I think the goal should be to make Samza as easy to get started with as something like Logstash. Logstash is vastly inferior in terms of capabilities to Samza but it's easy to get started and that makes a big difference. Cheers, Roger On Tue, Jul 7, 2015 at 3:29 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Forgot to add. On the naming issues, Kafka Metamorphosis is a clear winner :) -- Gianmarco On 7 July 2015 at 13:26, Gianmarco De Francisci Morales g...@apache.org wrote: Hi, @Martin, thanks for you comments. Maybe I'm missing some important point, but I think coupling the releases
Re: Thoughts and obesrvations on Samza
That's great. Thanks, Jay. On Fri, Jul 10, 2015 at 8:46 AM, Jay Kreps j...@confluent.io wrote: Yeah totally agree. I think you have this issue even today, right? I.e. if you need to make a simple config change and you're running in YARN today you end up bouncing the job which then rebuilds state. I think the fix is exactly what you described which is to have a long timeout on partition movement for stateful jobs so that if a job is just getting bounced, and the cluster manager (or admin) is smart enough to restart it on the same host when possible, it can optimistically reuse any existing state it finds on disk (if it is valid). So in this model the charter of the CM is to place processes as stickily as possible and to restart or re-place failed processes. The charter of the partition management system is to control the assignment of work to these processes. The nice thing about this is that the work assignment, timeouts, behavior, configs, and code will all be the same across all cluster managers. So I think that prototype would actually give you exactly what you want today for any cluster manager (or manual placement + restart script) that was sticky in terms of host placement since there is already a configurable partition movement timeout and task-by-task state reuse with a check on state validity. -Jay On Fri, Jul 10, 2015 at 8:34 AM, Roger Hoover roger.hoo...@gmail.com wrote: That would be great to let Kafka do as much heavy lifting as possible and make it easier for other languages to implement Samza apis. One thing to watch out for is the interplay between Kafka's group management and the external scheduler/process manager's fault tolerance. If a container dies, the Kafka group membership protocol will try to assign it's tasks to other containers while at the same time the process manager is trying to relaunch the container. Without some consideration for this (like a configurable amount of time to wait before Kafka alters the group membership), there may be thrashing going on which is especially bad for containers with large amounts of local state. Someone else pointed this out already but I thought it might be worth calling out again. Cheers, Roger On Tue, Jul 7, 2015 at 11:35 AM, Jay Kreps j...@confluent.io wrote: Hey Roger, I couldn't agree more. We spent a bunch of time talking to people and that is exactly the stuff we heard time and again. What makes it hard, of course, is that there is some tension between compatibility with what's there now and making things better for new users. I also strongly agree with the importance of multi-language support. We are talking now about Java, but for application development use cases people want to work in whatever language they are using elsewhere. I think moving to a model where Kafka itself does the group membership, lifecycle control, and partition assignment has the advantage of putting all that complex stuff behind a clean api that the clients are already going to be implementing for their consumer, so the added functionality for stream processing beyond a consumer becomes very minor. -Jay On Tue, Jul 7, 2015 at 10:49 AM, Roger Hoover roger.hoo...@gmail.com wrote: Metamorphosis...nice. :) This has been a great discussion. As a user of Samza who's recently integrated it into a relatively large organization, I just want to add support to a few points already made. The biggest hurdles to adoption of Samza as it currently exists that I've experienced are: 1) YARN - YARN is overly complex in many environments where Puppet would do just fine but it was the only mechanism to get fault tolerance. 2) Configuration - I think I like the idea of configuring most of the job in code rather than config files. In general, I think the goal should be to make it harder to make mistakes, especially of the kind where the code expects something and the config doesn't match. The current config is quite intricate and error-prone. For example, the application logic may depend on bootstrapping a topic but rather than asserting that in the code, you have to rely on getting the config right. Likewise with serdes, the Java representations produced by various serdes (JSON, Avro, etc.) are not equivalent so you cannot just reconfigure a serde without changing the code. It would be nice for jobs to be able to assert what they expect from their input topics in terms of partitioning. This is getting a little off topic but I was even thinking about creating a Samza config linter that would sanity check a set of configs. Especially in organizations where config is managed by a different team than the application developer, it's very hard to get avoid config mistakes. 3) Java/Scala centric
Re: Sample code or tutorial for writing/reading Avro type message in Samza
Hi Selina, If you want to use Confluent's schema registry for Avro, then I have an example in this repo: https://github.com/theduderog/hello-samza-confluent Cheers, Roger On Tue, Nov 17, 2015 at 12:32 AM, Selina Techwrote: > Dear All: > Do you know where I can find the tutorial or sample code for writing > Avro type message to Kafka and reading Avro type message from Kafka in > Samza? > I am wondering how should I serialized GenericRecord to byte and > deserialized it? > Your comments/suggestion are highly appreciated. > > Sincerely, > Selina >
Re: New Samza blog published - http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node
Thanks for sharing! Tao, did you use YARN to run 15 containers or is there a way to have them statically divide up the tasks? On Mon, Aug 24, 2015 at 12:03 PM, Ed Yakabosky eyakabo...@linkedin.com.invalid wrote: Hi Samza open source, I want to share that Tao Feng https://www.linkedin.com/pub/tao-feng/14/958/171 (from LinkedIn's Performance Team) has published a blog post http://engineering.linkedin.com/performance/benchmarking-apache-samza-12-million-messages-second-single-node on Samza perf benchmarks in collaboration with our development team. A lot of hard work went into this blog post so please join me in congratulating Tao and other contributors, and please share to your Big Data social media circles. Synopsis: *T**he objective of the blog is to measure Samza's performance in terms of the message-processing rate for a single machine for typical Samza use cases. T**his will help engineers to understand and to optimize performance and provide a basis for establishing a capacity model to run Samza platform as a service.* Thanks, Ed Yakabosky Streams Infrastructure TPM, LinkedIn
Re: Samza and KStreams (KIP-28): LinkedIn's POV
Hi Yi, Thank you for sharing this update and perspective. I tend to agree that for simple, stateless cases, things could be easier and hopefully KStreams may help with that. I also appreciate a lot of features that Samza already supports for operations. As previously discussed, the biggest request I have is being able to run Samza without YARN, under something like Kubernetes instead. Also, I'm curious. What's the current state of the Samza SQL physical operators? Are they used in production yet? Is there a doc on how to use them? Thanks, Roger On Fri, Oct 2, 2015 at 1:54 PM, Yi Panwrote: > Hi, all Samza-lovers, > > This question on the relationship of Kafka KStream (KIP-28) and Samza has > come up a couple times recently. So we wanted to clarify where we stand at > LinkedIn in terms of this discussion. > > Samza has historically had a symbiotic relationship with Kafka and will > continue to work very well with Kafka. Earlier in the year, we had an > in-depth discussion exploring an even deeper integration with Kafka. After > hitting multiple practical issues (e.g. Apache rules) and technical issues > we had to give up on that idea. As a fall out of the discussion, the Kafka > community is adding some of the basic event processing capabilities into > Kafka core directly. The basic callback/push style programming model by > itself is a great addition to the Kafka API set. > > However at LinkedIn, we continue to be firmly committed to Samza as our > stream processing framework. Although KStream is a nice addition to Kafka > stack, our goals for Samza are broader. There are some key technical > differences that makes Samza the right strategy for us. > > 1. Support for non-kafka systems : > > At LinkedIn a larger percentage of our streaming jobs use Databus as an > input source. For any such non-Kafka source, although the CopyCat > connector framework gives a common model for pulling data out of a source > and pushing it into Kafka, it introduces yet another piece of > infrastructure that we have to operate and manage. Also for any companies > who are already on AWS, Google Compute, Azure etc. asking them to deploy > and operate kafka in AWS instead of using the natively supported services > like Kinesis, Google Cloud pub-sub, etc. etc. can potentially be a > non-starter. With more acquisitions at LinkedIn that use AWS we are also > facing this first hand. The Samza community has a healthy set of system > producers/consumers which are in the works (Kinesis, ActiveMQ, > ElasticSearch, HDFS, etc.). > > 2. We run Samza as a Stream Processing Service at LinkedIn. This is > fundamentally different from KStream. > > This is similar to AWS Lambda and Google Cloud Dataflow, Azure Stream > Insight and similar services. The service makes it much easier for > developers to get their stream processing jobs up and running in production > by helping with the most common problems like monitoring, dashboards, > auto-scale, rolling upgrades and such. > > The truth is that if the stream processing application is stateless then > some of these common problems are not as involved and can be solved even on > regular IaaS platforms like EC2 and such. Arguably stateless applications > can be built easily on top of the native APIs from the input source like > kafka, kinesis etc. > > The place where Samza shines is with stateful stream processing > applications. When a Samza application uses the local rocks DB based > state, the application needs special care in terms of rolling upgrades, > addition/removal of containers/machines, temporary machine failures, > capacity management. We have already done some really good work in Samza > 0.10 to make sure that we don't reseed the state from kafka (i.e. > host-affinity feature that allows to reuse the local states). In the > absence of this feature, we had multiple production issues caused due to > network saturation during state reseeding from kafka. The problems with > stateful applications are similar to problems encountered when building > no-sql databases and other data systems. > > There are surely some scenarios where customers don't want any YARN > dependency and want to run their stream processing application on a > dedicated set of nodes. This is where KStream clearly has an advantage > over current Samza. Prior to KStream we had a patch in Samza which also > solved the same problem (SAMZA-516). We do expect to finish this patch soon > and formally support Stand Alone Samza. > > 3. Operators for Stream Processing and SQL : > > At LinkedIn, there is a growing need to iterate Samza jobs faster in the > loop of implement, deploy, revise the code, and deploy again. A key > bottleneck that slows down this iteration is the implementation of a Samza > job. It has long-been recognized in the Samza community that there is a > strong need for a high-level language support to shorten this iterative > process. Since last year, we have
Re: Executing Samza jobs natively in Kubernetes
Elias, I would also love to be able to deploy Samza on Kubernetes with dynamic task management. Thanks for sharing this. It may be a good interim solution. Roger On Sun, Nov 29, 2015 at 11:18 AM, Elias Levywrote: > I've been exploring Samza for stream processing as well as Kubernetes as a > container orchestration system and I wanted to be able to use one with the > other. The prospect of having to execute YARN either along side or on top > of Kubernetes did not appeal to me, so I developed a KubernetesJob > implementation of SamzaJob. > > You can find the details at https://github.com/eliaslevy/samza_kubernetes, > but in summary KubernetesJob executes and generates a serialized JobModel. > Instead of interacting with Kubernetes directly to create the > SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with the > YARN RM), it output a config YAML file that can be used to create the > SamzaContainers in Kubernetes by using Resource Controllers. For this you > require to package your job as a Docker image. You can reach the README at > the above repo for details. > > A few observations: > > It would be useful if SamzaContainer accepted the JobModel via an > environment variable. Right not it expects a URL to download it from. I > get around this by using a entry point script that copies the model from an > environment variable into a file, then passes a file URL to SamzaContainer. > > SamzaContainer doesn't allow you to configure the JMX port. It selects a > port at random from the ephemeral range as it expects to execute in YARN > where a static port could result in a conflict. This is not the case in > Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP > address. > > This implementation doesn't provide a Samza dashboard, which in the YARN > implementation is hosted in the Application Master. There didn't seem to > be much value provided by the dashboard that is not already provided by the > Kubernetes tools for monitoring pods. > > I've successfully executed the hello-samza jobs in Kubernetes: > > $ kubectl get po > NAME READY STATUSRESTARTS AGE > kafka-1-jjh8n 1/1 Running 0 2d > kafka-2-buycp 1/1 Running 0 2d > kafka-3-tghkp 1/1 Running 0 2d > wikipedia-feed-0-4its2 1/1 Running 0 1d > wikipedia-parser-0-l0onv 1/1 Running 0 17h > wikipedia-parser-1-crrxh 1/1 Running 0 17h > wikipedia-parser-2-1c5nn 1/1 Running 0 17h > wikipedia-stats-0-3gaiu1/1 Running 0 16h > wikipedia-stats-1-j5qlk1/1 Running 0 16h > wikipedia-stats-2-2laos1/1 Running 0 16h > zookeeper-1-1sb4a 1/1 Running 0 2d > zookeeper-2-dndk7 1/1 Running 0 2d > zookeeper-3-46n09 1/1 Running 0 2d > > > Finally, accessing services within the Kubernetes cluster from the outside > is quite cumbersome unless one uses an external load balancer. This makes > it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper and > Kafka to find out the number of partitions on the topics it will subscribe > to, so it can assign them statically among the number of containers > requested. > > Ideally Samza would operate along the lines of the Kafka high-level > consumer, which dynamically coordinate to allocate work among members of a > consumer group. This would do away with the new to execute SamzaJob a > priori to generate the JobModel to pass to the SamzaContainers. It would > also allow for dynamically changing the number of containers without having > the shutdown the job. >
Re: Executing Samza jobs natively in Kubernetes
Awesome. Thanks. On Sun, Nov 29, 2015 at 3:25 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > Roger, > > You are welcomed. If you want to experiment, you can use my hello samza > <https://hub.docker.com/r/elevy/hello-samza/> Docker image. > > On Sun, Nov 29, 2015 at 12:19 PM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Elias, > > > > I would also love to be able to deploy Samza on Kubernetes with dynamic > > task management. Thanks for sharing this. It may be a good interim > > solution. > > > > Roger > > > > On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy < > fearsome.lucid...@gmail.com> > > wrote: > > > > > I've been exploring Samza for stream processing as well as Kubernetes > as > > a > > > container orchestration system and I wanted to be able to use one with > > the > > > other. The prospect of having to execute YARN either along side or on > > top > > > of Kubernetes did not appeal to me, so I developed a KubernetesJob > > > implementation of SamzaJob. > > > > > > You can find the details at > > https://github.com/eliaslevy/samza_kubernetes, > > > but in summary KubernetesJob executes and generates a serialized > > JobModel. > > > Instead of interacting with Kubernetes directly to create the > > > SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with > the > > > YARN RM), it output a config YAML file that can be used to create the > > > SamzaContainers in Kubernetes by using Resource Controllers. For this > > you > > > require to package your job as a Docker image. You can reach the > README > > at > > > the above repo for details. > > > > > > A few observations: > > > > > > It would be useful if SamzaContainer accepted the JobModel via an > > > environment variable. Right not it expects a URL to download it > from. I > > > get around this by using a entry point script that copies the model > from > > an > > > environment variable into a file, then passes a file URL to > > SamzaContainer. > > > > > > SamzaContainer doesn't allow you to configure the JMX port. It > selects a > > > port at random from the ephemeral range as it expects to execute in > YARN > > > where a static port could result in a conflict. This is not the case > in > > > Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP > > > address. > > > > > > This implementation doesn't provide a Samza dashboard, which in the > YARN > > > implementation is hosted in the Application Master. There didn't seem > to > > > be much value provided by the dashboard that is not already provided by > > the > > > Kubernetes tools for monitoring pods. > > > > > > I've successfully executed the hello-samza jobs in Kubernetes: > > > > > > $ kubectl get po > > > NAME READY STATUSRESTARTS AGE > > > kafka-1-jjh8n 1/1 Running 0 2d > > > kafka-2-buycp 1/1 Running 0 2d > > > kafka-3-tghkp 1/1 Running 0 2d > > > wikipedia-feed-0-4its2 1/1 Running 0 1d > > > wikipedia-parser-0-l0onv 1/1 Running 0 17h > > > wikipedia-parser-1-crrxh 1/1 Running 0 17h > > > wikipedia-parser-2-1c5nn 1/1 Running 0 17h > > > wikipedia-stats-0-3gaiu1/1 Running 0 16h > > > wikipedia-stats-1-j5qlk1/1 Running 0 16h > > > wikipedia-stats-2-2laos1/1 Running 0 16h > > > zookeeper-1-1sb4a 1/1 Running 0 2d > > > zookeeper-2-dndk7 1/1 Running 0 2d > > > zookeeper-3-46n09 1/1 Running 0 2d > > > > > > > > > Finally, accessing services within the Kubernetes cluster from the > > outside > > > is quite cumbersome unless one uses an external load balancer. This > > makes > > > it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper > > and > > > Kafka to find out the number of partitions on the topics it will > > subscribe > > > to, so it can assign them statically among the number of containers > > > requested. > > > > > > Ideally Samza would operate along the lines of the Kafka high-level > > > consumer, which dynamically coordinate to allocate work among members > of > > a > > > consumer group. This would do away with the new to execute SamzaJob a > > > priori to generate the JobModel to pass to the SamzaContainers. It > would > > > also allow for dynamically changing the number of containers without > > having > > > the shutdown the job. > > > > > >
Re: HTTP-based Elasticsearch system producer and reusable task
Hi Yi, It could be merged into the Samza project if there's enough interest but may need some re-working depending on which dependencies are ok to bring in. I did it outside of the Samza project first because I had to get it done quickly so it relies on Java 8 features, dropwizard metrics for histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which itself drags in more dependencies (Guava, Gson, commons http). There are few issues with the existing ElasticsearchSystemProducer: 1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch Java API (a bulky dependency) 2. It only supports index requests. I needed to also support updates and deletes. 3. There currently no plugin mechanism to register a flush listener. The reason I needed that was to be able to report end to end latency stats (total pipeline latency = commit time - event time). #3 is easily solvable with a additional plugin options. #1 and #2 require changing the system producer API. Roger On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Roger, > > That's awesome! Are you planning to submit the HTTP-based system producer > in Samza open-source samza-elasticsearch module? If ElasticSearch community > suggest that HTTP-based clients be the recommended way, we should use it in > samza-elasticsearch as well. And what's your opinion on the existing > ElasticsearchSystemProducer? If the SystemProducer APIs and configure > options do not change, I would vote to replace the implementation w/ > HTTP-based ElasticsearchSystemProducer. > > Thanks for putting this new additions up! > > -Yi > > On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Hi Samza folks, > > > > For people who want to use HTTP to integrate with Elasticsearch, I wrote > an > > HTTP-based system producer and a reusable task, including latency stats > > from event origin time, task processing time, and time spent talking to > > Elasticsearch API. > > > > > https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md > > > > Cheers, > > > > Roger > > >
Re: ElasticsearchSystemProducer Crashes Samza Job
Hi Jeremiah, There's currently no way to do that. I think the best way to modify the existing ElasticsearchSystemProducer would be to add a config option for a callback to let you customize this behavior. Basically, a pluggable listener ( https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101 ). On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adamswrote: > We have a samza job configured to run in a yarn cluster. This job consumes > multiple kafka topics and routes the messages to elasticsearch for > indexing. When enough batch-updates to elasticsearch fail using the > ElasticsearchSystemProducer, the entire samza job dies. Due to > checkpointing + yarn, the job starts backup, starts reading where it left > off and dies again. Enter loop. > > Updates to ES are failing due to invalid data on the part of our consumers > but I can't aways control them so need to be defensive about the code. I > don't see how to handle this in any of the source examples. I would like to > just trap this error and if it is what I expect it to be - squash it. Can > someone point me in the right direction? > > Below is the log where the failure occurs. > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send > message from TaskName-Partition 5 to system elastic. > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught > exception in thread (name=main). Exiting process now. > org.apache.samza.SamzaException: Unable to send message from > TaskName-Partition 5 to system elastic. > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186) > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92) > at > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > at > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47) > at > > org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672) > at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564) > at > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) > at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, > exiting > > - jeremiah >
Re: ElasticsearchSystemProducer Crashes Samza Job
The code that you showed below is part of the BulkProcessor.Listener interface so if that listener were pluggable, you could override the default behavior (which is to only ignore version conflicts). On Tue, Feb 16, 2016 at 12:27 PM, jeremiah adams <jadams...@gmail.com> wrote: > The root of the issue may be in the HTTP status code handling. This code > seems to imply that the only valid error case from Elasticsearch is > conflict. This is too narrow of a constraint. In one of my use cases, a > mapping/message conflict occurs resulting in an HTTP 400. In my case, it is > perfectly reasonable to log the error, not raise hasFatalError = true and > continue processing. A way to control this via properties or some other > mechanism would probably solve the issue. Setting the hasFatalError flag > looks to be the source of the unhandled exception that ultimately fails the > job. > > I don't think the ListenerCallback will solve the problem. I don't > understand how that might stop the unhandled exception being raised by > flush(). > > if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) { > LOGGER.info("Failed to index document in Elasticsearch: " + > itemResp.getFailureMessage()); >} else { > hasFatalError = true; > LOGGER.error("Failed to index document in Elasticsearch: " + > itemResp.getFailureMessage()); >} > > - jeremiah > > On Tue, Feb 16, 2016 at 12:18 PM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Hi Jeremiah, > > > > There's currently no way to do that. I think the best way to modify the > > existing ElasticsearchSystemProducer would be to add a config option for > a > > callback to let you customize this behavior. Basically, a pluggable > > listener ( > > > > > https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101 > > ). > > > > > > > > On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <jadams...@gmail.com> > > wrote: > > > > > We have a samza job configured to run in a yarn cluster. This job > > consumes > > > multiple kafka topics and routes the messages to elasticsearch for > > > indexing. When enough batch-updates to elasticsearch fail using the > > > ElasticsearchSystemProducer, the entire samza job dies. Due to > > > checkpointing + yarn, the job starts backup, starts reading where it > left > > > off and dies again. Enter loop. > > > > > > Updates to ES are failing due to invalid data on the part of our > > consumers > > > but I can't aways control them so need to be defensive about the code. > I > > > don't see how to handle this in any of the source examples. I would > like > > to > > > just trap this error and if it is what I expect it to be - squash it. > Can > > > someone point me in the right direction? > > > > > > Below is the log where the failure occurs. > > > > > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send > > > message from TaskName-Partition 5 to system elastic. > > > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught > > > exception in thread (name=main). Exiting process now. > > > org.apache.samza.SamzaException: Unable to send message from > > > TaskName-Partition 5 to system elastic. > > > at > > > > > > > > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186) > > > at > > > > > > > > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92) > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > at > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > > at > org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47) > > > at > > > > > > > > > org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672) > > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564) > > > at > > > > > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) > > > at > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) > > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 > ms > > > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, > > > exiting > > > > > > - jeremiah > > > > > >
Re: [DISCUSS] Moving to github/pull-request for code review and check-in
+1 - Thanks for bringing this up, Yi. I've done it both ways and feel pull requests are much easier. Sent from my iPhone > On Feb 18, 2016, at 4:25 PM, Navina Ramesh> wrote: > > +1 > > Haven't tried any contribution with pull requests. But sounds simpler than > attaching the patch to JIRA. > > Navina > >> On Thu, Feb 18, 2016 at 4:01 PM, Jacob Maes wrote: >> >> +1 >> >> As a relatively new contributor to Samza, I've certainly felt the current >> process was overly-complicated. >> >>> On Thu, Feb 18, 2016 at 3:53 PM, Yi Pan wrote: >>> >>> Hi, all, >>> >>> I want to start the discussion on our code review/commit process. >>> >>> I felt that our code review and check-in process is a little bit >>> cumbersome: >>> - developers need to create RBs and attach diff to JIRA >>> - committers need to review RBs, dowload diff and apply, then push. >>> >>> It would be much lighter if we take the pull request only approach, as >>> Kafka already converted to: >>> - for the developers, the only thing needed is to open a pull request. >>> - for committers, review and apply patch is from the same PR and merge >> can >>> be done directly on remote git repo. >>> >>> Of course, there might be some hookup scripts that we will need to link >>> JIRA w/ pull request in github, which Kafka already does. Any comments >> and >>> feedbacks are welcome! >>> >>> Thanks! >>> >>> -Yi > > > > -- > Navina R.
Re: HTTP-based Elasticsearch system producer and reusable task
Hi Yi, Please see my comment inline. On Tue, Feb 9, 2016 at 10:08 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Roger, > > Got it! I would like to understand more on the SystemProducer API changes > required by #1 and #2. Could you elaborate a bit more? > > For #1, IndexRequestFactory interface ( https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java#L32) returns an object of type (org.elasticsearch.action.index.IndexRequest) which comes from the elasticsearch jar ( https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java ). For #2, the way to support Updates and Deletes would be to have the "IndexRequestFactory" return an more generic "ActionRequest" which could be an index, update, or delete action. (in which case it should be called ActionRequestFactory). > Regarding to JDK8 required in the new HTTP-based Elasticsearch producer, I > want to ask how you are motivated to go w/ JDK8. It does bring a lot more > nice features. If we deprecate source-level compatibility to JDK7, we can > benefit from a lot of new features from JDK8, like lambda, stream APIs, > etc. And refactor Scala code to JDK8 is also much easier. > > I really like that functions are first-class citizens in java 8 and stream api is quite helpful as well. For example, first-class functions helped me avoid duplicate code that would have occurred because JEST doesn't expose a common ancestor type for each type of builder (index, update, delete). Passing in functions instead of an objects with a common ancestor type solved the problem. https://github.com/quantiply/rico/blob/master/samza-elasticsearch/src/main/java/com/quantiply/elasticsearch/HTTPBulkLoader.java#L237-L272 > Thanks! > > -Yi > > On Tue, Feb 9, 2016 at 4:19 PM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Hi Yi, > > > > It could be merged into the Samza project if there's enough interest but > > may need some re-working depending on which dependencies are ok to bring > > in. I did it outside of the Samza project first because I had to get it > > done quickly so it relies on Java 8 features, dropwizard metrics for > > histogram metrics, and JEST (https://github.com/searchbox-io/Jest) which > > itself drags in more dependencies (Guava, Gson, commons http). > > > > There are few issues with the existing ElasticsearchSystemProducer: > > > >1. The plugin API (IndexRequestFactory) is tied to the Elasticsearch > >Java API (a bulky dependency) > >2. It only supports index requests. I needed to also support updates > >and deletes. > >3. There currently no plugin mechanism to register a flush listener. > >The reason I needed that was to be able to report end to end latency > > stats > >(total pipeline latency = commit time - event time). > > > > #3 is easily solvable with a additional plugin options. #1 and #2 require > > changing the system producer API. > > > > Roger > > > > On Tue, Feb 9, 2016 at 10:56 AM, Yi Pan <nickpa...@gmail.com> wrote: > > > > > Hi, Roger, > > > > > > That's awesome! Are you planning to submit the HTTP-based system > producer > > > in Samza open-source samza-elasticsearch module? If ElasticSearch > > community > > > suggest that HTTP-based clients be the recommended way, we should use > it > > in > > > samza-elasticsearch as well. And what's your opinion on the existing > > > ElasticsearchSystemProducer? If the SystemProducer APIs and configure > > > options do not change, I would vote to replace the implementation w/ > > > HTTP-based ElasticsearchSystemProducer. > > > > > > Thanks for putting this new additions up! > > > > > > -Yi > > > > > > On Tue, Feb 9, 2016 at 10:39 AM, Roger Hoover <roger.hoo...@gmail.com> > > > wrote: > > > > > > > Hi Samza folks, > > > > > > > > For people who want to use HTTP to integrate with Elasticsearch, I > > wrote > > > an > > > > HTTP-based system producer and a reusable task, including latency > stats > > > > from event origin time, task processing time, and time spent talking > to > > > > Elasticsearch API. > > > > > > > > > > > > > > https://github.com/quantiply/rico/blob/master/docs/common_tasks/es-push.md > > > > > > > > Cheers, > > > > > > > > Roger > > > > > > > > > >
Re: ThreadJobFactory in production
Jose, It would be great if you could share it. I'm interested in trying to use it as well. Thanks, Roger On Wed, Mar 2, 2016 at 2:31 PM, José Barruetawrote: > Hi guys, > > At Stormpath, we made a custom samza 10 version merging SAMZA-41 into it, > it's working well, so we are thinking to update that patch later this week > so it can be added to the main project. > > HTH, > > Jose Luis Barrueta > > On Wed, Mar 2, 2016 at 2:11 PM, Yi Pan wrote: > > > Hi, Robert, > > > > The main reason that ThreadJobFactory and ProcessJobFactory are not > > considered "production-ready" is that there is only one container for the > > job and all tasks are assigned to the single container. Hence, it is not > > easy to scale out of a single host. > > > > As Rick mentioned, Netflix has put up a patch in SAMZA-41 based on 0.9.1 > o > > allow static assignment of a subset of partitions to a single ProcessJob, > > which allows to launch multiple ProcessJobs in different hosts. We > planned > > to merge it to 0.10. But it turns out that too much changes have gone > into > > 0.10 and it became difficult to merge the patch. At this point, we can > > still try the following two options: > > 1) We can attempt to merge SAMZA-41 to 0.10.1 again, it may take some > > effort but would give a stop-gap solution. > > 2) We are working on a standalone Samza model (SAMZA-516, SAMZA-881) to > > allow users to run Samza w/o depending on YarnJobFactory. This is a > > long-term effort and will take some time to flesh out. Please join the > > discussion there s.t. we can be more aligned in our effort. > > > > Hope the above gives you an overall picture on where we are going. > > > > Thanks a lot! > > > > -Yi > > > > On Wed, Mar 2, 2016 at 1:28 PM, Rick Mangi wrote: > > > > > There was an interesting thread a while back from I believe the netflix > > > guys about running ThreadJobFactory in production. > > > > > > > > > > On Mar 2, 2016, at 4:20 PM, Robert Crim wrote: > > > > > > > > Hi, > > > > > > > > We're currently working on a solution that allows us to run Samza > jobs > > on > > > > Mesos. This seems to be going well, and something we'd like to move > > away > > > > from when native Mesos support is added to Samza. > > > > > > > > While we're developing and testing our scheduler, I'm wondering about > > the > > > > implications of running tasks with the ThreadJobFactory in > > "production". > > > > The documentation advise against this, but it's not clear why. > > > > > > > > If we were using the ThreadJobFactory inside of a docker container on > > > Mesos > > > > with Marathon for production, would be our main problem? These are > not > > > > particularly high-load tasks. Aside from not be able to get > > find-grained > > > > resource scheduling per-task, it seems like the main issue the not > > being > > > to > > > > easily tell when a job stops due to error / exception. > > > > > > > > In other words, what would be stop-stopping reasons to not use the > > > > TreadJobFactory in production? > > > > > > > > Thanks, > > > > Rob > > > > > > > > >