Samza 13.1 and Kafka 1.0
Hi, Does anyone know if Samza 13.1 is compatible with Kafka 1.0? Thanks, Xiaochuan Yu
Exit code 248 from YARN and InterruptedException
Hi, I'm trying to debug some problem with a Samza job that loads data from a Cassandra database into Kafka via a custom System implementation. The System makes use of BlockingEnvelopeMap. What I see in the logs is InterruptedException from put calls from the class that inherited BlockingEnvelopeMap and then the container exiting with code 248: java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:339) at org.apache.samza.util.BlockingEnvelopeMap.putAll(BlockingEnvelopeMap.java:199) ... There is almost nothing else in the logs so I'm very puzzled on what could be causing this. Has anyone ever seen exit code 248? Could a problem with Kafka cause something like this? Thanks, Xiaochuan Yu
Re: Samza Job Slow to Restart
Hi, We were finally able to find out why the job takes so long to start. There was higher than normal network IO during job startup and so we checked size of the checkpoint topic on disk and it was ~21GB. We then restarted the Kafka node who was the leader for the checkpoint topic, the topic disk size went down to ~1.8GB and the job started up fairly quickly. Its probably due to a bug in Kafka where log cleaner died and we never noticed: https://issues.apache.org/jira/browse/KAFKA-3894. We have since been working on upgrading Kafka to avoid this bug. Hope this helps if anyone else ever runs into it. Thanks, Xiaochuan Yu On Sat, Sep 23, 2017 at 6:17 PM XiaoChuan Yu wrote: > >> How long does it take? > It took around 10 minute from "Got offset 0 for topic > ..." to init() being called on the Task. > > >> Have you measured which parts of the start up sequence take the most > time? > >> - is it checkpoint restoration, or restore of local state? > Should be checkpoint restoration. There is no local state for this job. > > >> If reading from the checkpoint topic takes the most time, then I'd > >> recommend reading from the beginning from that topic, and benchmarking > how > >> long it takes? It'll also help to verify if the checkpoint topic is > >> actually log-compacted. > I'm not sure how to verify how much the topic is compacted by Kafka. > The cleanup policy is to compact though. > > >> Do containers eventually start? Or does the start-up hang? > >> If so, a thread dump will be useful. > It does eventually start up. > > >> Can you please link and attach the entire log file for us to take a > look? > Unfortunately there is too much stuff for me to redact from the log right > now. > However, I can tell you that the job has two input topics both with the > following settings: > systems.kafka.streams.my-special-topic.samza.reset.offset=true > systems.kafka.streams.my-special-topic.samza.offset.default=upcoming > It was thought that this would speedup startup of the job to no avail. > > On Wed, Sep 20, 2017 at 3:21 PM Jagadish Venkatraman < > jagadish1...@gmail.com> wrote: > >> Hi Xiaochuan, >> >> >> What does that loop do exactly? >> >> Most of what the run-loop does is documented in >> https://samza.apache.org/learn/documentation/0.9/container/event-loop.html >> >> >> We are running into a problem where it seems to take a very long time >> to >> restart a Samza job. >> >> Some follow-up questions, >> >> How long does it take? >> Have you measured which parts of the start up sequence take the most time? >> - is it checkpoint restoration, or restore of local state? >> If reading from the checkpoint topic takes the most time, then I'd >> recommend reading from the beginning from that topic, and benchmarking how >> long it takes? It'll also help to verify if the checkpoint topic is >> actually log-compacted. >> Do containers eventually start? Or does the start-up hang? If so, a thread >> dump will be useful. >> Can you please link and attach the entire log file for us to take a look? >> >> >> 3. Any ideas on how to fix this? >> >> We can perhaps, try to narrow down where the time is spent in startup from >> the logs? Depending on that, I can suggest a fix :-) >> >> Thanks, >> Jagadish >> >> On Wed, Sep 20, 2017 at 11:21 AM, XiaoChuan Yu >> wrote: >> >> > Hi, >> > >> > We are running into a problem where it seems to take a very long time to >> > restart a Samza job. >> > We are using Samza 0.9.1 at the moment. >> > >> > From the logs for a particular container it looks like it has something >> to >> > do with reading checkpoints from Kafka: >> > >> > 2017-09-20 03:21:02.060 INFO o.a.s.c.kafka.KafkaCheckpointManager >> [main] >> > - >> > Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job_1 and >> > partition 0. Attempting to fetch messages for checkpoint log. >> > 2017-09-20 03:21:02.072 INFO o.a.s.c.kafka.KafkaCheckpointManager >> [main] >> > - >> > Get latest offset 42890599 for topic >> > __samza_checkpoint_ver_1_for_test-job_1 and partition 0. >> > >> > Looking at this line in KafkaCheckpointManager >> > <https://github.com/apache/samza/blob/0.9.1/samza-kafka/ >> > src/main/scala/org/apache/samza/checkpoint/kafka/ >> > KafkaCheckpointManager.scala#L275>, >> > it seems to indicate that the loop iterates from 0 to 42890599 and make >> > requests for each. >> > >> > Questions: >> > 1. What does that loop do exactly? >> > 2. Is this an expected behaviour? Is "Got offset 0 for topic ..." >> normal? >> > 3. Any ideas on how to fix this? >> > >> > Thanks, >> > Xiaochuan Yu >> > >> >> >> >> -- >> Jagadish V, >> Graduate Student, >> Department of Computer Science, >> Stanford University >> >
Historical container logs in YARN UI
Hi, Is there a way to view historical container logs in YARN UI? When I try view historical logs from YARN UI right now I get the follow message: Failed while trying to construct the redirect url to the log server. Log Server url may not be configured ... I configured log aggregation and timeline server in YARN. I know there's a history server implementation for Map Reduce. Is there a similar history server implementation available for Samza? Thanks, Xiaochuan Yu
Re: Deploying Samza Jobs Using S3 and YARN on AWS
I found out that it was necessary to include "hadoop-aws" as a part of the package submitted to YARN similar to the instructions for deploying from HDFS <https://samza.apache.org/learn/tutorials/0.7.0/deploy-samza-job-from-hdfs.html> . However, due to a dependency conflict on the AWS SDK between our code and "hadoop-aws", we can't actually include it. We are now planning to make use of HTTP FS instead. On Fri, Sep 15, 2017 at 2:45 PM Jagadish Venkatraman wrote: > Thank you Xiaochuan for your question! > > You should ensure that *every machine in your cluster* has the S3 jar file > in its YARN class-path. From your error, it looks like the machine you are > running on does not have the JAR file corresponding to *S3AFileSystem*. > > >> Whats the right way to set this up? Should I just copy over the required > AWS jars to the Hadoop conf directory > > I'd lean on the side of simplicity and the *scp* route seems to address > most of your needs. > > >> Should I be editing run-job.sh or run-class.sh? > > You should not have to edit any of these files. Once you fix your > class-paths by copying those relevant JARs, it should just work. > > Please let us know if you need more assistance. > > -- > Jagdish > > > On Fri, Sep 15, 2017 at 11:07 AM, XiaoChuan Yu > wrote: > > > Hi, > > > > I'm trying to deploy a Samza job using YARN and S3 where I upload the zip > > package to S3 and point yarn.package.path to it. > > Does anyone know what kind of set up steps is required for this? > > > > What I've tried so far is to get Hello Samza to be run this way in AWS. > > > > However I ran into the following exception: > > Exception in thread "main" java.lang.RuntimeException: > > java.lang.ClassNotFoundException: Class > > org.apache.hadoop.fs.s3a.S3AFileSystem not found > > at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2112) > > at org.apache.hadoop.fs.FileSystem.getFileSystemClass( > > FileSystem.java:2578) > > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > > ... > > > > Running "$YARN_HOME/bin/yarn classpath" gives the following: > > /home/ec2-user/deploy/yarn/etc/hadoop > > /home/ec2-user/deploy/yarn/etc/hadoop > > /home/ec2-user/deploy/yarn/etc/hadoop > > /home/ec2-user/deploy/yarn/share/hadoop/common/lib/* > > /home/ec2-user/deploy/yarn/share/hadoop/common/* > > /home/ec2-user/deploy/yarn/share/hadoop/hdfs > > /home/ec2-user/deploy/yarn/share/hadoop/hdfs/lib/* > > /home/ec2-user/deploy/yarn/share/hadoop/hdfs/* > > /home/ec2-user/deploy/yarn/share/hadoop/yarn/lib/* > > /home/ec2-user/deploy/yarn/share/hadoop/yarn/* > > /home/ec2-user/deploy/yarn/share/hadoop/mapreduce/lib/* > > /home/ec2-user/deploy/yarn/share/hadoop/mapreduce/* > > /contrib/capacity-scheduler/*.jar > > /home/ec2-user/deploy/yarn/share/hadoop/yarn/* > > /home/ec2-user/deploy/yarn/share/hadoop/yarn/lib/* > > > > I manually copied the required AWS related jars to > > /home/ec2-user/deploy/yarn/share/hadoop/common. > > I checked that it is loadable by running "yarn > > org.apache.hadoop.fs.s3a.S3AFileSystem" which gives the "Main method not > > found" error instead of class not found. > > > > From the console output of run-job.sh I see the following in class path: > > 1. All jars under the lib directory of the zip package > > 2. /home/ec2-user/deploy/yarn/etc/hadoop (Hadoop conf directory) > > > > The class path from run-job.sh seem to be missing the AWS related jars > > required for S3AFileSystem. > > Whats the right way to set this up? > > Should I just copy over the required AWS jars to the Hadoop conf > directory > > (2.)? > > Should I be editing run-job.sh or run-class.sh? > > > > Thanks, > > Xiaochuan Yu > > > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University >
Re: Samza Job Slow to Restart
>> How long does it take? It took around 10 minute from "Got offset 0 for topic ... " to init() being called on the Task. >> Have you measured which parts of the start up sequence take the most time? >> - is it checkpoint restoration, or restore of local state? Should be checkpoint restoration. There is no local state for this job. >> If reading from the checkpoint topic takes the most time, then I'd >> recommend reading from the beginning from that topic, and benchmarking how >> long it takes? It'll also help to verify if the checkpoint topic is >> actually log-compacted. I'm not sure how to verify how much the topic is compacted by Kafka. The cleanup policy is to compact though. >> Do containers eventually start? Or does the start-up hang? >> If so, a thread dump will be useful. It does eventually start up. >> Can you please link and attach the entire log file for us to take a look? Unfortunately there is too much stuff for me to redact from the log right now. However, I can tell you that the job has two input topics both with the following settings: systems.kafka.streams.my-special-topic.samza.reset.offset=true systems.kafka.streams.my-special-topic.samza.offset.default=upcoming It was thought that this would speedup startup of the job to no avail. On Wed, Sep 20, 2017 at 3:21 PM Jagadish Venkatraman wrote: > Hi Xiaochuan, > > >> What does that loop do exactly? > > Most of what the run-loop does is documented in > https://samza.apache.org/learn/documentation/0.9/container/event-loop.html > > >> We are running into a problem where it seems to take a very long time to > restart a Samza job. > > Some follow-up questions, > > How long does it take? > Have you measured which parts of the start up sequence take the most time? > - is it checkpoint restoration, or restore of local state? > If reading from the checkpoint topic takes the most time, then I'd > recommend reading from the beginning from that topic, and benchmarking how > long it takes? It'll also help to verify if the checkpoint topic is > actually log-compacted. > Do containers eventually start? Or does the start-up hang? If so, a thread > dump will be useful. > Can you please link and attach the entire log file for us to take a look? > > >> 3. Any ideas on how to fix this? > > We can perhaps, try to narrow down where the time is spent in startup from > the logs? Depending on that, I can suggest a fix :-) > > Thanks, > Jagadish > > On Wed, Sep 20, 2017 at 11:21 AM, XiaoChuan Yu > wrote: > > > Hi, > > > > We are running into a problem where it seems to take a very long time to > > restart a Samza job. > > We are using Samza 0.9.1 at the moment. > > > > From the logs for a particular container it looks like it has something > to > > do with reading checkpoints from Kafka: > > > > 2017-09-20 03:21:02.060 INFO o.a.s.c.kafka.KafkaCheckpointManager [main] > > - > > Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job_1 and > > partition 0. Attempting to fetch messages for checkpoint log. > > 2017-09-20 03:21:02.072 INFO o.a.s.c.kafka.KafkaCheckpointManager [main] > > - > > Get latest offset 42890599 for topic > > __samza_checkpoint_ver_1_for_test-job_1 and partition 0. > > > > Looking at this line in KafkaCheckpointManager > > <https://github.com/apache/samza/blob/0.9.1/samza-kafka/ > > src/main/scala/org/apache/samza/checkpoint/kafka/ > > KafkaCheckpointManager.scala#L275>, > > it seems to indicate that the loop iterates from 0 to 42890599 and make > > requests for each. > > > > Questions: > > 1. What does that loop do exactly? > > 2. Is this an expected behaviour? Is "Got offset 0 for topic ..." normal? > > 3. Any ideas on how to fix this? > > > > Thanks, > > Xiaochuan Yu > > > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University >
Samza Job Slow to Restart
Hi, We are running into a problem where it seems to take a very long time to restart a Samza job. We are using Samza 0.9.1 at the moment. >From the logs for a particular container it looks like it has something to do with reading checkpoints from Kafka: 2017-09-20 03:21:02.060 INFO o.a.s.c.kafka.KafkaCheckpointManager [main] - Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job_1 and partition 0. Attempting to fetch messages for checkpoint log. 2017-09-20 03:21:02.072 INFO o.a.s.c.kafka.KafkaCheckpointManager [main] - Get latest offset 42890599 for topic __samza_checkpoint_ver_1_for_test-job_1 and partition 0. Looking at this line in KafkaCheckpointManager <https://github.com/apache/samza/blob/0.9.1/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala#L275>, it seems to indicate that the loop iterates from 0 to 42890599 and make requests for each. Questions: 1. What does that loop do exactly? 2. Is this an expected behaviour? Is "Got offset 0 for topic ..." normal? 3. Any ideas on how to fix this? Thanks, Xiaochuan Yu
Deploying Samza Jobs Using S3 and YARN on AWS
Hi, I'm trying to deploy a Samza job using YARN and S3 where I upload the zip package to S3 and point yarn.package.path to it. Does anyone know what kind of set up steps is required for this? What I've tried so far is to get Hello Samza to be run this way in AWS. However I ran into the following exception: Exception in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2112) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) ... Running "$YARN_HOME/bin/yarn classpath" gives the following: /home/ec2-user/deploy/yarn/etc/hadoop /home/ec2-user/deploy/yarn/etc/hadoop /home/ec2-user/deploy/yarn/etc/hadoop /home/ec2-user/deploy/yarn/share/hadoop/common/lib/* /home/ec2-user/deploy/yarn/share/hadoop/common/* /home/ec2-user/deploy/yarn/share/hadoop/hdfs /home/ec2-user/deploy/yarn/share/hadoop/hdfs/lib/* /home/ec2-user/deploy/yarn/share/hadoop/hdfs/* /home/ec2-user/deploy/yarn/share/hadoop/yarn/lib/* /home/ec2-user/deploy/yarn/share/hadoop/yarn/* /home/ec2-user/deploy/yarn/share/hadoop/mapreduce/lib/* /home/ec2-user/deploy/yarn/share/hadoop/mapreduce/* /contrib/capacity-scheduler/*.jar /home/ec2-user/deploy/yarn/share/hadoop/yarn/* /home/ec2-user/deploy/yarn/share/hadoop/yarn/lib/* I manually copied the required AWS related jars to /home/ec2-user/deploy/yarn/share/hadoop/common. I checked that it is loadable by running "yarn org.apache.hadoop.fs.s3a.S3AFileSystem" which gives the "Main method not found" error instead of class not found. >From the console output of run-job.sh I see the following in class path: 1. All jars under the lib directory of the zip package 2. /home/ec2-user/deploy/yarn/etc/hadoop (Hadoop conf directory) The class path from run-job.sh seem to be missing the AWS related jars required for S3AFileSystem. Whats the right way to set this up? Should I just copy over the required AWS jars to the Hadoop conf directory (2.)? Should I be editing run-job.sh or run-class.sh? Thanks, Xiaochuan Yu
Re: Kafka 0.10.2 compatibility with Samza 0.13.1
Hi Yi, Thanks for all the suggestions. Our team eventually decided to move off CDH Kafka(and YARN) due to some issues not relevant to Samza. For what its worth, I did test the a couple basic Hello Samza (Samza 0.13.1) jobs against a single node CDH Kafka version 0.10.2.0+kafka2.2.0+110 with success. The testing was done on AWS EC2 instances running RedHat 7. We also found out that running against CDH YARN required various dependency overrides such as the ones in Hello Samza's "cdh5" build profile <https://github.com/apache/samza-hello-samza/blob/master/pom.xml#L282-L297>. Thanks, Xiaochuan Yu On Fri, Sep 1, 2017 at 1:18 PM Yi Pan wrote: > Hi, XiaoChuan, > > Just confirmed with Jiangjie Qin (our in-house Kafka PMC), Kafka client > 0.10.1.1 does not work with broker 0.10.0 due to new protocol version > introduced in 0.10.1.1. Hence, if you try to run Samza 0.13.1 against Kafka > broker 0.10.0 version, it won't work. The best option for that seems to be > swapping out the runtime Kafka client 0.10.1.1 version in Samza 0.13.1 to > Kafka client 0.10.0.1 (which was used in Samza 0.12). Since there is no > incompatible API calls to Kafka client lib between Samza 0.12.0 and Samza > 0.13.1, that should work and is worth trying out. > > Let us know if we can be further assistance here. > > Thanks! > > -Yi > > On Thu, Aug 31, 2017 at 4:14 PM, Yi Pan wrote: > > > Hi, Xiaochuan, > > > > I am relaying your question to our in-house Kafka expert to confirm. But > > as far as I can tell from the description, the main breaking changes are > in > > the KStreams API. Samza only uses the standard Kafka consumer/producer > APIs > > now and should not be affected. > > > > I would recommend to give it a try in your question 1. > > > > As for your question 2, are you referring to a) binding a Kafka 0.10.0.0 > > client library with Samza 0.13.1? Or b) running Samza 0.13.1 against > Kafka > > broker version 0.10.0.0? > > > > If you are referring to a), from the commit history, there is no source > > code level changes when we upgrade the dependency from Kafka 0.10.0.1 to > > 0.10.1.1 in Samza 0.13.1. If you want to make sure that works, you can > try > > to downgrade the Kafka version in your build environment to 0.10.0.0 and > > build your app to see whether it breaks the build. If the build passes, > the > > client should work w/ Samza 0.13.1, unless Kafka client has an API that > > changes runtime behavior between 0.10.0.1 and 0.10.1.1. > > > > If you are referring to b), it is not guaranteed that the higher version > > of Kafka client library will work with an older version of Kafka broker. > > However, Samza has been very conservative in the usage of new features in > > Kafka client library. Hence, you would have a higher chance of success to > > run Samza 0.13.1 against Kafka 0.10.0 brokers. > > > > Unfortunately, we have not test all the above combinations internally. I > > would wait for the confirmation from our Kafka expert before I can give > you > > a definite answer. > > > > Thanks! > > > > -Yi > > >
Kafka 0.10.2 compatibility with Samza 0.13.1
Hi, I have a few questions about Kafka version compatiblity: 1. Is Samza 0.13.1 compatible with Kafka 0.10.2? I see that Samza is compiled against Kafka 0.10.1.1 right now and from Kafka docs <http://kafka.apache.org/0102/documentation.html#upgrade_1020_streams>it looks there are significant changes in 0.10.2. 2. Would Kafka 0.10.0.0 work with Samza 0.13.1? Does anyone have any experience on this? The reason for my question is that we are setting up a fresh Samza stack and are planning to use Cloudera's distribution of Kafka which based off a subset of Kafka versions. Cloudera Kafka version table: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html . Thanks, Xiaochuan Yu
Re: Custom ordering when using async
Hi Jagadish, This is a rather late reply but I don't think I understand of the effect of changing job.container.thread.pool.size config in the synchronous case very well. Suppose I have an input topic partitioned by memberID processed by a synchronous job. Does increasing container thread pool size still guarantee that the messages will be processed serially for messages with the same key? For example, if I had 2 messages msg1 followed by msg2 with the same key then is it guaranteed that msg1 will finish processing before starting to process msg2? My actual situation is that we currently have a job that has maxed out the allowed number of containers but is still not processing messages fast enough. We want to speedup processing but we would like to avoid increasing the number of partitions of the input topic. The job is IO bound and has the requirement that messages with the same key are processed serially. Thanks, Xiaochuan Yu On Thu, Aug 10, 2017 at 11:28 PM Jagadish Venkatraman < jagadish1...@gmail.com> wrote: > Hi XiaoChuan, > > Are you setting task.max.concurrency > 1 that allows multiple messages > in-flight? (The "keyed executor pool" is only meaningful with that > scenario) > > Also, Have you tried increasing your *job.container.thread.pool.size > *config > and setting it to the number of tasks in the container? Given that your > input topic is already partitioned by memberID, it'll probably be simpler > to try this first, benchmark your QPS and see if it meets your performance > goals. I'd tune these config-knobs first and confirm that you need the > "keyed executor thread pool". You may find that it introduces more > complexity. > > Please let us know if you had further questions. We are happy to further > help you tune your job for maximum performance. > > > > On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu wrote: > > > Hi, XiaoChuan, > > > > For your questions: > > > > 1. By "keyed single thread executor pool", it means something like a map > > from a key to a single thread executor, like Map where > > each Executor is a Executors.*newSingleThreadExecutor > > <https://docs.oracle.com/javase/7/docs/api/java/util/ > > concurrent/Executors.html#newSingleThreadExecutor()>* > > (). This means for a particular key, it will be executed in a designated > > thread, which guarantees the ordering of the key. > > > > 2. For your use case, you can create the above keyed executors by setting > > the key being some hash of the user id. For example: > > > > Map keyedExecutors = new HashMap<>(); > > > > in processAsync(): > > String memberId = > > int hash = memberId.hashCode(); // you can reduce the hash size by % > > Executor executor = keyedExecutors.get(hash); > > if (executor == null) { > > executor = Executors.newSingleThreadExecutor(); > > keyedExecutors.put(hash, executor); > > } > > > > executor.execute(() -> process your message here); > > ... > > > > So the same user will always be executed in a single thread, which > ensures > > the ordering. Does this make sense to you? > > > > Thanks, > > Xinyu > > > > > > > > On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu > > wrote: > > > > > Hi, > > > > > > I have a few questions regarding the order of processing when using > > > processAsync. > > > > > > From the LinkedIn article here > > > <https://engineering.linkedin.com/blog/2017/01/asynchronous- > > > processing-and-multithreading-in-apache-samza--part> > > > it > > > mentions the following: > > > "For parallelism within a task, Samza guarantees processAsync will be > > > invoked in order for a task. The processing or completion, however, can > > go > > > out of order. With this guarantee, users can implement sub-task-level > > data > > > pipelining with customized ordering and parallelism. For example, users > > can > > > use a keyed single thread executor pool to have in-order processing per > > key > > > while processing messages with different keys in parallel." > > > > > > 1. What exactly is meant by a "keyed single thread executor pool"? Are > > > there any code examples available on what this looks like? > > > 2. I need to process a stream keyed on user IDs in parallel using > > > processAsync but would like each user's event be processed in order. > Does > > > this then require custom ordering logic mentioned in the article? > > > > > > Thanks, > > > Xiaochuan Yu > > > > > > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University >
Custom ordering when using async
Hi, I have a few questions regarding the order of processing when using processAsync. >From the LinkedIn article here <https://engineering.linkedin.com/blog/2017/01/asynchronous-processing-and-multithreading-in-apache-samza--part> it mentions the following: "For parallelism within a task, Samza guarantees processAsync will be invoked in order for a task. The processing or completion, however, can go out of order. With this guarantee, users can implement sub-task-level data pipelining with customized ordering and parallelism. For example, users can use a keyed single thread executor pool to have in-order processing per key while processing messages with different keys in parallel." 1. What exactly is meant by a "keyed single thread executor pool"? Are there any code examples available on what this looks like? 2. I need to process a stream keyed on user IDs in parallel using processAsync but would like each user's event be processed in order. Does this then require custom ordering logic mentioned in the article? Thanks, Xiaochuan Yu
Re: Steps to Upgrading Samza (0.9 to 0.12)
If you already have 2.6.0 installed (as you have said), > > > > > > I > > > > > > believe > > > > > you > > > > > > > > > > > > > > > > > >will be fine. (but I'm not sure) > > > > > > > > > > > > *Kafka version: * > > > > > > > > > > > >- Samza 0.12 upgraded the version of Kafka to 0.10. > > > > > >- If your Kafka brokers are on an older version of Kafka, > > > > > > you > > > > > > should > > > > > >upgrade them to use at-least 0.10. Kafka clients are > > > > > > usually > > > > > >incompatible with older versions of brokers. > > > > > > > > > > > > *Java version: * > > > > > > > > > > > > > > > > > > > > > > > >- Samza 0.12 binaries are compiled using Java 8. Hence, > > > > > > they > > > > > > cannot > > > > > be > > > > > > > > > > > > > > > > > >run on older versions of the Java run-time. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm extremely new to Samza in terms of operations aspect. > > > > > > > > I'm > > > > > > > > not sure > > > > > > what > > > > > > information would be relevant in this case so please ask > > > > > > away. > > > > > > > > > > > > I'd first start by upgrading the Kafka brokers (assuming > > > > > > you're > > > > > > on Java > > > > > 8+ > > > > > > > > > > > > > > > > > > already). > > > > > > Let us know how the migration goes! > > > > > > > > > > > > Thanks, > > > > > > Jagadish > > > > > > > > > > > > > > > > > > On Fri, Mar 24, 2017 at 8:23 PM, XiaoChuan Yu > > > > > ik.c > > > > > > om> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > What are the general steps for upgrading Samza from 0.9 to > > > > > > > 0.12? > > > > > > > Do I need to upgrade Kafka and/or YARN? > > > > > > > > > > > > > > I don't know how Samza was setup initially but we currently > > > > > > > have the > > > > > > > following setup: > > > > > > > > > > > > > > Samza version: 0.9.1 > > > > > > > YARN version: Hadoop 2.6.0-cdh5.4.8 > > > > > > > Kafka version: 0.9.0.1 > > > > > > > > > > > > > > I think installation of Kafka and YARN were managed through > > > > > > > Puppet. > > > > > > > I'm extremely new to Samza in terms of operations aspect. > > > > > > > I'm > > > > > > > not sure > > > > > > what > > > > > > > > > > > > > > > > > > > > > information would be relevant in this case so please ask > > > > > > > away. > > > > > > > > > > > > > > Thanks, > > > > > > > Xiaochuan Yu > > > > > > > > > > > > > > > > > > > -- > > > > > > Jagadish V, > > > > > > Graduate Student, > > > > > > Department of Computer Science, > > > > > > Stanford University > > > > > > > > > > > > > -- > > > > > > > > > Tommy Becker > > > > > > Senior Software Engineer > > > > > > O +1 919.460.4747 > > > > > > tivo.com > > > > > > > > > > > > > > > This email and any attachments may contain confidential and > > > privileged > > > material for the sole use of the intended recipient. Any review, > > > copying, > > > or distribution of this email (or any attachments) by others is > > > prohibited. > > > If you are not the intended recipient, please contact the sender > > > immediately and permanently delete this email and any attachments. > > > No > > > employee or agent of TiVo Inc. is authorized to conclude any > > > binding > > > agreement on behalf of TiVo Inc. by email. Binding agreements with > > > TiVo > > > Inc. may only be made by a signed written agreement. > > > > -- > > > Tommy Becker > > Senior Software Engineer > > O +1 919.460.4747 > > tivo.com > > > > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. >
Steps to Upgrading Samza (0.9 to 0.12)
Hi, What are the general steps for upgrading Samza from 0.9 to 0.12? Do I need to upgrade Kafka and/or YARN? I don't know how Samza was setup initially but we currently have the following setup: Samza version: 0.9.1 YARN version: Hadoop 2.6.0-cdh5.4.8 Kafka version: 0.9.0.1 I think installation of Kafka and YARN were managed through Puppet. I'm extremely new to Samza in terms of operations aspect. I'm not sure what information would be relevant in this case so please ask away. Thanks, Xiaochuan Yu