RE: Relaunching init
At the moment I am using version 0.9.1 and I feel that it will be the solution when I switch to the next version of my application. I have the project in a "don't touch if not really needed" status. Thanks, Jordi -Mensaje original- De: Navina Ramesh [mailto:nram...@linkedin.com.INVALID] Enviado el: jueves, 08 de octubre de 2015 19:31 Para: dev@samza.apache.org Asunto: Re: Relaunching init Hi Jordi, init() method is called only once during container start-up and not called again until the container is restarted. Semantically, it doesn't seem fit to make "init" callable more than once. With Samza 0.10 (that is, the current master branch), you can pass configuration via a broadcast stream. You can configure one system stream partition to be consumed by all stream tasks. This way, whenever you write to the broadcast stream, you can reset or modify your config. I don't think we have the documentation for this on the website, yet. Here is how you configure a broadcast stream: Let's say your broadcast stream is a kafka stream called "my-broadcast-stream" and has 1 partition. You can configure your job with: task.global.inputs = kafka.my-broadcast-stream#0 Format is $system-name.$stream-name#$partition-number You can also make multiple partitions broadcast by specifying a range. Format is $system-name.$stream-name#[$partitionStart-$partitionEnd] In the above case, if your broadcast stream has 3 partitions that you want to broadcast, your configuration looks like: task.global.inputs = kafka.my-broadcast-stream#[0-2] Data from broadcast stream will be available like any other input stream in the process() method. You may have to change your app logic to handle the config change. This is one way for satisfying your use-case. Let me know if this solution will work for you. Thanks! Navina On Thu, Oct 8, 2015 at 2:45 AM, Jordi Blasi Uribarri wrote: > Hi, > > Is there a way of forcing the init of a job being rerun without > killing the job and restarting it? > > I have some configurations loaded in a job that are obtained in the > public void init(Config config, TaskContext context) function. When > the configuration changes and I need it to be reaplied the only way I > find is to completely kill the job and start it. This is a process > that takes quite a long. I don’t really know why but until I have the > messages flowing again I can stay for several minutes waiting. > > Is there a way to do this in any other way? I am thinking of some kind > of kill –HUP like in BIND servers (ok, I had a time as sysadmin ☺ ) or > maybe a way of programmatically calling a running job to reload. > > Thanks , > > Jordi > > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] > -- Navina R.
Relaunching init
Hi, Is there a way of forcing the init of a job being rerun without killing the job and restarting it? I have some configurations loaded in a job that are obtained in the public void init(Config config, TaskContext context) function. When the configuration changes and I need it to be reaplied the only way I find is to completely kill the job and start it. This is a process that takes quite a long. I don’t really know why but until I have the messages flowing again I can stay for several minutes waiting. Is there a way to do this in any other way? I am thinking of some kind of kill –HUP like in BIND servers (ok, I had a time as sysadmin ☺ ) or maybe a way of programmatically calling a running job to reload. Thanks , Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: Versioning question
I finally got it working. I have had a communication problem with Kafka and looks like the topics were corrupt, so the Job launching process was not being able to get them running. Kafka reinstallation, new topics and now I have 3 jobs running. Thanks. Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: miércoles, 30 de septiembre de 2015 15:29 Para: dev@samza.apache.org Asunto: Re: Versioning question Hi, Jordi, Yeah, first to clarify, master(trunk) is beyond the current release. You should be able to find a branch 0.9.1 that actually is the current released version. And the following combination is verified: 1) JDK7 2) Scala 2.10 3) Yarn 2.6 4) Kafka 0.8.2.1 I would recommend to cleanup your installed copies and try to have the above sets of components installed before you proceed. Thanks! -Yi On Tue, Sep 29, 2015 at 2:42 AM, Jordi Blasi Uribarri wrote: > I was going through files I have in the system and I found some > strange things that make me wonder about correct versioning. I am not > sure but It may be the origin of the problems I see. > > When I installed Samza I ran the following: > git clone http://git-wip-us.apache.org/repos/asf/samza.git > cd samza > ./gradlew clean build > > I thought that this was getting the last stable version (0.9.1) but I > have found that the compiled jars in my installation are named like this: > samza-kv-rocksdb_2.10-0.10.0-SNAPSHOT.jar > samza-core_2.10-0.10.0-SNAPSHOT.jar > samza-api-0.10.0-SNAPSHOT.jar > > So as I understand it, I have samza 0.10.0 version installed and for > scala 2.10. Is this correct? > > Kafka is installed with 2.9 scala. I am not sure if this is the > conflicting element. Should I go to 2.10? > > Hadoop and yarn are version 2.6.0. Should I go to 2.7.1? > > I have tried to place in the lib folder the jar files generated in the > samza compilation (2.10-0.10.0) and they show completely different > messages. Now I see an error that ask for a new library that I cannot > locate. > > Exception in thread "main" java.lang.NoClassDefFoundError: > joptsimple/OptionSpec > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:39) > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > Caused by: java.lang.ClassNotFoundException: joptsimple.OptionSpec > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > Do I have a versioning issue? Which library does this class belong to? > > Thanks > > Jordi > > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] >
Versioning question
I was going through files I have in the system and I found some strange things that make me wonder about correct versioning. I am not sure but It may be the origin of the problems I see. When I installed Samza I ran the following: git clone http://git-wip-us.apache.org/repos/asf/samza.git cd samza ./gradlew clean build I thought that this was getting the last stable version (0.9.1) but I have found that the compiled jars in my installation are named like this: samza-kv-rocksdb_2.10-0.10.0-SNAPSHOT.jar samza-core_2.10-0.10.0-SNAPSHOT.jar samza-api-0.10.0-SNAPSHOT.jar So as I understand it, I have samza 0.10.0 version installed and for scala 2.10. Is this correct? Kafka is installed with 2.9 scala. I am not sure if this is the conflicting element. Should I go to 2.10? Hadoop and yarn are version 2.6.0. Should I go to 2.7.1? I have tried to place in the lib folder the jar files generated in the samza compilation (2.10-0.10.0) and they show completely different messages. Now I see an error that ask for a new library that I cannot locate. Exception in thread "main" java.lang.NoClassDefFoundError: joptsimple/OptionSpec at org.apache.samza.job.JobRunner$.main(JobRunner.scala:39) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: joptsimple.OptionSpec at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) Do I have a versioning issue? Which library does this class belong to? Thanks Jordi ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: process killing
Just some basic questions to see if I am understanding correctly: - When you ask for the SamzaAppMaster log, you mean the container that ends with _01? There logs that I find in the userlogs folder do not say much. stdout shows the command launched, stderr shows nothing and gc.log.0 some lines like this: 2015-09-28T17:51:51.556+0200: 2,479: [GC 28273K->8028K(77312K), 0,0124110 secs] 2015-09-28T17:51:51.568+0200: 2,492: [Full GC 8028K->4959K(77312K), 0,1294490 secs] 2015-09-28T18:24:32.285+0200: 1963,208: [GC 37727K->8047K(77312K), 0,0087100 secs] 2015-09-28T18:51:51.699+0200: 3602,623: [GC 22599K->5977K(77312K), 0,0065030 secs] 2015-09-28T18:51:51.706+0200: 3602,629: [Full GC 5977K->2445K(77312K), 0,0914750 secs] 2015-09-28T19:51:51.799+0200: 7202,722: [GC 33571K->2541K(110080K), 0,0032800 secs] 2015-09-28T19:51:51.802+0200: 7202,725: [Full GC 2541K->2457K(110080K), 0,0921670 secs] - I reinstalled the servers (following the process I described in the other mail) and the problem persists. I am not sure where When you speak of leaking orphaned processes, you mean a process running in memory or some kind of disk level corruption? First one should be solved with a shutdown, but second one may imply locating some files and deleting them. - I am stuck with the problem, so I may say I am reproducing it. What should I capture to send to them? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 20:04 Para: dev@samza.apache.org Asunto: Re: process killing What did the SamzaAppMaster log say? It seems to me that the Yarn RM lost the state about the AppMaster instances. We have seen this in a very rare cases in LinkedIn (about once or twice a year) that Yarn leaks out some orphaned processes but really have no chance to reliably reproduce it. If you see this repeatedly, it would be nice to capture the reproduce sequence and provide it to Yarn community for help. Please keep us cc'ed as well. Thanks! -Yi On Mon, Sep 28, 2015 at 2:33 AM, Jordi Blasi Uribarri wrote: > This is an excerpt from the yarn-root-resourcemanager-kfk-samza01.out > file. Tell me if you need another file. > > Thanks, > >jordi > > 0001 State change from NEW to SUBMITTED > 2015-09-28 11:15:58,081 INFO [ResourceManager Event Processor] > capacity.LeafQueue (LeafQueue.java:activateApplications(626)) - Application > application_1443431699703_0003 from user: root activated in queue: default > 2015-09-28 11:15:58,081 INFO [ResourceManager Event Processor] > capacity.LeafQueue (LeafQueue.java:addApplicationAttempt(643)) - > Application added - appId: application_1443431699703_0003 user: > org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@3d3c59a, > leaf-queue: default #user-pending-applications: 0 > #user-active-applications: 3 #queue-pending-applications: 0 > #queue-active-applications: 3 > 2015-09-28 11:15:58,081 INFO [ResourceManager Event Processor] > capacity.CapacityScheduler > (CapacityScheduler.java:addApplicationAttempt(746)) - Added Application > Attempt appattempt_1443431699703_0003_01 to scheduler from user root in > queue default > 2015-09-28 11:15:58,082 INFO [AsyncDispatcher event handler] > attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(764)) - > appattempt_1443431699703_0003_01 State change from SUBMITTED to > SCHEDULED > 2015-09-28 11:15:58,135 INFO [ResourceManager Event Processor] > rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(380)) - > container_1443431699703_0003_01_01 Container Transitioned from NEW to > ALLOCATED > 2015-09-28 11:15:58,135 INFO [ResourceManager Event Processor] > resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(106)) - > USER=root OPERATION=AM Allocated Container TARGET=SchedulerApp > RESULT=SUCCESS APPID=application_1443431699703_0003 > CONTAINERID=container_1443431699703_0003_01_01 > 2015-09-28 11:15:58,135 INFO [ResourceManager Event Processor] > scheduler.SchedulerNode (SchedulerNode.java:allocateContainer(141)) - > Assigned container container_1443431699703_0003_01_01 of capacity > on host kfk-samza01:36066, which has 3 containers, > used and available after > allocation > 2015-09-28 11:15:58,136 INFO [ResourceManager Event Processor] > capacity.LeafQueue (LeafQueue.java:assignContainer(1570)) - > assignedContainer application attempt=appattempt_1443431699703_0003_01 > container=Container: [ContainerId: container_1443431699703_0003_01_01, > NodeId: kfk-samza01:36066, NodeHttpAddress: kfk-samza01:8042, Resource: > , Priority: 0, Token: null, ] queue=default: > capacity=1.0, absoluteCapacity=1.0, usedResources=, > usedCapacity=0.125, absoluteUsedCapacity=0.125, numApps=3, numContainers=2 > clusterResource= > 20
RE: container is running beyond virtual memory limits
with -Xms128M -Xmx128M I see that it runs with -Xmx768M. I have even changed the run-class.sh script but it does not change. Some things that I am describing do not make sense for me, so I am lost on what to do or where to look. Thanks for your help, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: lunes, 28 de septiembre de 2015 11:26 Para: dev@samza.apache.org Asunto: RE: container is running beyond virtual memory limits I just changed the task options file to add the following line: task.opts=-Xmx128M And I found no change on the behaivour. I see that the job is being launched with the default -Xmx768M value: root 8296 8294 1 11:16 ?00:00:05 /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Dsamza.container.name=samza-application-master -Dsamza.log.dir=/opt/hadoop-2.6.0/logs/userlogs/application_1443431699703_0003/container_1443431699703_0003_01_01 -Djava.io.tmpdir=/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/opt/hadoop-2.6.0/logs/userlogs/application_1443431699703_0003/container_1443431699703_0003_01_01/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /opt/hadoop-2.6.0/conf:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/DataAnalyzer-0.0.1.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/DataAnalyzer-0.0.1-jar-with-dependencies.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-annotations-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-core-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-databind-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-dataformat-smile-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-jaxrs-json-provider-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-module-jaxb-annotations-2.6.0.jar org.apache.samza.job.yarn.SamzaAppMaster How do I set the correct value? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 10:56 Para: dev@samza.apache.org Asunto: Re: container is running beyond virtual memory limits Hi, Jordi, Please find the config variable task.opts in this table: http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html This allows you to add additional JVM opts when launching the containers. -Yi On Mon, Sep 28, 2015 at 1:48 AM, Jordi Blasi Uribarri wrote: > The three tasks have a similar options file, like this one. > > task.class=flow.OperationJob > job.name=flow.OperationJob > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > yarn.package.path=http://IP/javaapp.tar.gz > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02: > 2181 > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01: > 9093,kfk-kafka02:9092,kfk-kafka02:9093 > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka > 01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo > intManagerFactory > task.checkpoint.system=kafka > task.inputs=kafka.operationtpc > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory > > systems.kafka.samza.msg.serde=string > systems.kafka.streams.tracetpc.samza.msg.serde=json > > yarn.container.memory.mb=256 > yarn.am.container.memory.mb=256 > > task.commit.ms=1000 > task.window.ms=6 > > Where do I have to change the XMX parameter? > > Thanks. > > Jordi > > > -Mensaje original- > De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de > septiembre de 2015 10:39 > Para: dev@samza.apache.org > Asunto: Re: container is running beyond virtual memory limits > > Hi, Jordi, > &g
RE: process killing
LED 2015-09-28 11:22:23,170 INFO [ResourceManager Event Processor] capacity.LeafQueue (LeafQueue.java:removeApplicationAttempt(686)) - Application removed - appId: application_1443431699703_0002 user: root queue: default #user-pending-applications: 0 #user-active-applications: 0 #queue-pending-applications: 0 #queue-active-applications: 0 2015-09-28 11:22:23,172 INFO [ResourceManager Event Processor] capacity.ParentQueue (ParentQueue.java:removeApplication(411)) - Application removed - appId: application_1443431699703_0002 user: root leaf-queue of parent: root #applications: 0 2015-09-28 11:22:23,184 INFO [IPC Server handler 20 on 8030] resourcemanager.ApplicationMasterService (ApplicationMasterService.java:finishApplicationMaster(351)) - application_1443431699703_0001 unregistered successfully. 2015-09-28 11:22:23,371 INFO [IPC Server handler 0 on 8032] resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(148)) - USER=root IP=192.168.15.92 OPERATION=Kill Application Request TARGET=ClientRMService RESULT=SUCCESS APPID=application_1443431699703_0002 2015-09-28 11:22:24,195 INFO [ResourceManager Event Processor] capacity.CapacityScheduler (CapacityScheduler.java:completedContainer(1190)) - Null container completed... 2015-09-28 11:24:59,693 INFO [Timer-3] scheduler.AbstractYarnScheduler (AbstractYarnScheduler.java:run(407)) - Release request cache is cleaned up -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 10:37 Para: dev@samza.apache.org Asunto: Re: process killing Hm... interesting. What did you see in the application master's logs? I saw that the remaining processes running are SamzaAppMasters. On Tue, Sep 22, 2015 at 1:05 AM, Jordi Blasi Uribarri wrote: > Hi, > > I have two machines running yarn and samza. They are samza 0.9.1 and > hadoop 2.6.0. > > I run the kill-all.sh I recently wrote and calls the kill-yarn-job.sh. > This is the output: > > java version "1.7.0_79" > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/jobs/lib/redirect-0.0.1.jar!/org/slf4j/impl/StaticLogge > rBinder.class] > SLF4J: Found binding in > [jar:file:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies.jar!/or > g/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > java version "1.7.0_79" > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /opt/hadoop-2.6.0/conf:/opt/jobs/lib/redirect-0.0.1.jar:/opt/jobs/lib/ > samzafroga-0.0.1-jar-with-dependencies.jar > org.apache.hadoop.yarn.client.cli.ApplicationCLI application -kill > application_1442908447829_0001 > 2015-09-22 10:02:46 RMProxy [INFO] Connecting to ResourceManager at > kfk-samza01/192.168.15.92:8032 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/jobs/lib/redirect-0.0.1.jar!/org/slf4j/impl/StaticLogge > rBinder.class] > SLF4J: Found binding in > [jar:file:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies.jar!/or > g/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > 2015-09-22 10:02:46 NativeCodeLoader [WARN] Unable to load > native-hadoop library for your platform... using builtin-java classes > where applicable Killing application application_1442908447829_0001 > 2015-09-22 10:02:47 YarnClientImpl [INFO] Killed application > application_1442908447829_0001 > java version "1.7.0_79" > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /opt/hadoop-2.6.0/conf:/opt/jobs/lib/redirect-0.0.1.jar:/opt/jobs/lib/ > samzafroga-0.0.1-jar-with-dependencies.jar > org.apache.hadoop.yarn.client.cli.ApplicationCLI application -kill > application_1442908447829_0002 > 2015-09-22 10:02:49 RMProxy [INFO] Connecting to ResourceManager a
RE: container is running beyond virtual memory limits
I just changed the task options file to add the following line: task.opts=-Xmx128M And I found no change on the behaivour. I see that the job is being launched with the default -Xmx768M value: root 8296 8294 1 11:16 ?00:00:05 /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Dsamza.container.name=samza-application-master -Dsamza.log.dir=/opt/hadoop-2.6.0/logs/userlogs/application_1443431699703_0003/container_1443431699703_0003_01_01 -Djava.io.tmpdir=/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/opt/hadoop-2.6.0/logs/userlogs/application_1443431699703_0003/container_1443431699703_0003_01_01/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /opt/hadoop-2.6.0/conf:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/DataAnalyzer-0.0.1.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/DataAnalyzer-0.0.1-jar-with-dependencies.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-annotations-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-core-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-databind-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-dataformat-smile-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-jaxrs-json-provider-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1443431699703_0003/container_1443431699703_0003_01_01/__package/lib/jackson-module-jaxb-annotations-2.6.0.jar org.apache.samza.job.yarn.SamzaAppMaster How do I set the correct value? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 10:56 Para: dev@samza.apache.org Asunto: Re: container is running beyond virtual memory limits Hi, Jordi, Please find the config variable task.opts in this table: http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html This allows you to add additional JVM opts when launching the containers. -Yi On Mon, Sep 28, 2015 at 1:48 AM, Jordi Blasi Uribarri wrote: > The three tasks have a similar options file, like this one. > > task.class=flow.OperationJob > job.name=flow.OperationJob > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > yarn.package.path=http://IP/javaapp.tar.gz > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02: > 2181 > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01: > 9093,kfk-kafka02:9092,kfk-kafka02:9093 > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka > 01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo > intManagerFactory > task.checkpoint.system=kafka > task.inputs=kafka.operationtpc > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory > > systems.kafka.samza.msg.serde=string > systems.kafka.streams.tracetpc.samza.msg.serde=json > > yarn.container.memory.mb=256 > yarn.am.container.memory.mb=256 > > task.commit.ms=1000 > task.window.ms=6 > > Where do I have to change the XMX parameter? > > Thanks. > > Jordi > > > -Mensaje original- > De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de > septiembre de 2015 10:39 > Para: dev@samza.apache.org > Asunto: Re: container is running beyond virtual memory limits > > Hi, Jordi, > > Can you post your task.opts settings as well? The Xms and Xmx JVM opts > will play a role here as well. The Xmx size should be set to less than > yarn.container.memory.mb. > > -Yi > > On Tue, Sep 22, 2015 at 4:32 AM, Jordi Blasi Uribarri > > wrote: > > > I am seeing that I can not get even a single job running. I have > > recovered the original configuration of yarn-site.xml and > > capacity-scheduler.xml and that does not work
RE: limits configuration
I copy the same file for both nodes on every change. I understand that it is the correct way of doing it. -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 10:50 Para: dev@samza.apache.org Asunto: Re: limits configuration Just out of curiosity, did you just change the yarn-site.xml for RM or for NMs as well? On Fri, Sep 25, 2015 at 3:23 AM, Jordi Blasi Uribarri wrote: > Hi, > > I think that this is more a Yarn question than a Samza one, but I am > really stuck on this. I am having problems understanding some (I > belive) basic concepts on resource management, and so I cannot have things > working. > > In have two virtual machines with 2 cpus and 2048MB ram each. I have > Debian installed on them and Samza over it. I am trying to get three > samza jobs working on my cluster, and I expect that I will have to run > some more in the future (at least two). > > When I start Yarn I see in the web administration that I am offered a > total amount of 16Gb of RAM and 16 Vcores, which does not make sense > to me on what I have assigned. > > I have configured yarn-site.xml with the following: > > > yarn.scheduler.minimum-allocation-mb > 128 > > > yarn.scheduler.maximum-allocation-mb > 2048 > > > yarn.scheduler.minimum-allocation-vcores > 1 > > > yarn.scheduler.maximum-allocation-vcores > 2 > > > I changed the capacity scheduler.xml in this value (from 0.1) > > > yarn.scheduler.capacity.maximum-am-resource-percent > 0.5 > > > The three jobs have the following > > yarn.container.memory.mb=256 > yarn.am.container.memory.mb=256 > > When I launch the jobs I can only get one in the running state. In > fact, and that surprises me, even when I launch them individually only > one of them gets to RUNNING. In this moment I have no way of running two of > them. > I have tried to move the values above but I got no result in different > combinations. > > I have not seen any error in the logs. > > What is preventing the jobs from getting to RUNNING state? > > Thanks. > > Jordi > > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] >
RE: container is running beyond virtual memory limits
The three tasks have a similar options file, like this one. task.class=flow.OperationJob job.name=flow.OperationJob job.factory.class=org.apache.samza.job.yarn.YarnJobFactory yarn.package.path=http://IP/javaapp.tar.gz systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181 systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093 systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909 task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka task.inputs=kafka.operationtpc serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory systems.kafka.samza.msg.serde=string systems.kafka.streams.tracetpc.samza.msg.serde=json yarn.container.memory.mb=256 yarn.am.container.memory.mb=256 task.commit.ms=1000 task.window.ms=6 Where do I have to change the XMX parameter? Thanks. Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: lunes, 28 de septiembre de 2015 10:39 Para: dev@samza.apache.org Asunto: Re: container is running beyond virtual memory limits Hi, Jordi, Can you post your task.opts settings as well? The Xms and Xmx JVM opts will play a role here as well. The Xmx size should be set to less than yarn.container.memory.mb. -Yi On Tue, Sep 22, 2015 at 4:32 AM, Jordi Blasi Uribarri wrote: > I am seeing that I can not get even a single job running. I have > recovered the original configuration of yarn-site.xml and > capacity-scheduler.xml and that does not work. I am thinking that > maybe there is some kind of information related to old jobs that have > not been correctly cleaned when killing them. Is there any place where > I can look to remove temporary files or something similar? > > Thanks > > jordi > > -----Mensaje original- > De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: martes, > 22 de septiembre de 2015 10:06 > Para: dev@samza.apache.org > Asunto: container is running beyond virtual memory limits > > Hi, > > I am not really sure If this is related to any of the previous > questions so I am asking it in a new message. I am running three > different samza jobs that perform different actions and interchange > information. As I found limits in the memory that were preventing the > jobs to get from Accepted to Running I introduced some configurations in > Yarn, as suggested in this list: > > > yarn-site.xml > > > > yarn.scheduler.minimum-allocation-mb > 128 > Minimum limit of memory to allocate to each container > request at the Resource Manager. > > > yarn.scheduler.maximum-allocation-mb > 512 > Maximum limit of memory to allocate to each container > request at the Resource Manager. > > > yarn.scheduler.minimum-allocation-vcores > 1 > The minimum allocation for every container request at > the RM, in terms of virtual CPU cores. Requests lower than this won't > take effect, and the specified value will get allocated the > minimum. > > > yarn.scheduler.maximum-allocation-vcores > 2 > The maximum allocation for every container request at > the RM, in terms of virtual CPU cores. Requests higher than this won't > take effect, and will get capped to this value. > > > yarn.resourcemanager.hostname > kfk-samza01 > > > > capacity-scheduler.xml > Alter value > > yarn.scheduler.capacity.maximum-am-resource-percent > 0.5 > > Maximum percent of resources in the cluster which can be used to run > application masters i.e. controls number of concurrent running > applications. > > > > The jobs are configured to reduce the memory usage: > > yarn.container.memory.mb=256 > yarn.am.container.memory.mb=256 > > After introducing these changes I experienced a very appreciable > reduction of the speed. It seemed normal as the memory assigned to the > jobs was lowered and there were more running. It was running until > yesterday but today I am seeing that > > What I have seen today is that they are not moving from ACCEPTED to > RUNNING. I have found the following in the log (full log at the end): > > 2015-09-22 09:54:36,661 INFO [Container Monitor] > monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(408)) - > Memory usage of ProcessTree 10346 for container-id > container_1442908447829_0001_01_01: 70.0 MB of 256 MB physical > memory used; 1.2 GB
limits configuration
Hi, I think that this is more a Yarn question than a Samza one, but I am really stuck on this. I am having problems understanding some (I belive) basic concepts on resource management, and so I cannot have things working. In have two virtual machines with 2 cpus and 2048MB ram each. I have Debian installed on them and Samza over it. I am trying to get three samza jobs working on my cluster, and I expect that I will have to run some more in the future (at least two). When I start Yarn I see in the web administration that I am offered a total amount of 16Gb of RAM and 16 Vcores, which does not make sense to me on what I have assigned. I have configured yarn-site.xml with the following: yarn.scheduler.minimum-allocation-mb 128 yarn.scheduler.maximum-allocation-mb 2048 yarn.scheduler.minimum-allocation-vcores 1 yarn.scheduler.maximum-allocation-vcores 2 I changed the capacity scheduler.xml in this value (from 0.1) yarn.scheduler.capacity.maximum-am-resource-percent 0.5 The three jobs have the following yarn.container.memory.mb=256 yarn.am.container.memory.mb=256 When I launch the jobs I can only get one in the running state. In fact, and that surprises me, even when I launch them individually only one of them gets to RUNNING. In this moment I have no way of running two of them. I have tried to move the values above but I got no result in different combinations. I have not seen any error in the logs. What is preventing the jobs from getting to RUNNING state? Thanks. Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: container is running beyond virtual memory limits
I am seeing that I can not get even a single job running. I have recovered the original configuration of yarn-site.xml and capacity-scheduler.xml and that does not work. I am thinking that maybe there is some kind of information related to old jobs that have not been correctly cleaned when killing them. Is there any place where I can look to remove temporary files or something similar? Thanks jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: martes, 22 de septiembre de 2015 10:06 Para: dev@samza.apache.org Asunto: container is running beyond virtual memory limits Hi, I am not really sure If this is related to any of the previous questions so I am asking it in a new message. I am running three different samza jobs that perform different actions and interchange information. As I found limits in the memory that were preventing the jobs to get from Accepted to Running I introduced some configurations in Yarn, as suggested in this list: yarn-site.xml yarn.scheduler.minimum-allocation-mb 128 Minimum limit of memory to allocate to each container request at the Resource Manager. yarn.scheduler.maximum-allocation-mb 512 Maximum limit of memory to allocate to each container request at the Resource Manager. yarn.scheduler.minimum-allocation-vcores 1 The minimum allocation for every container request at the RM, in terms of virtual CPU cores. Requests lower than this won't take effect, and the specified value will get allocated the minimum. yarn.scheduler.maximum-allocation-vcores 2 The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this won't take effect, and will get capped to this value. yarn.resourcemanager.hostname kfk-samza01 capacity-scheduler.xml Alter value yarn.scheduler.capacity.maximum-am-resource-percent 0.5 Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. The jobs are configured to reduce the memory usage: yarn.container.memory.mb=256 yarn.am.container.memory.mb=256 After introducing these changes I experienced a very appreciable reduction of the speed. It seemed normal as the memory assigned to the jobs was lowered and there were more running. It was running until yesterday but today I am seeing that What I have seen today is that they are not moving from ACCEPTED to RUNNING. I have found the following in the log (full log at the end): 2015-09-22 09:54:36,661 INFO [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(408)) - Memory usage of ProcessTree 10346 for container-id container_1442908447829_0001_01_01: 70.0 MB of 256 MB physical memory used; 1.2 GB of 537.6 MB virtual memory used I am not sure where that 1.2 Gb comes from and makes the processes dye. Thanks, Jordi 2015-09-22 09:54:36,519 INFO [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(458)) - Removed ProcessTree with root 10271 2015-09-22 09:54:36,519 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(999)) - Container container_1442908447829_0002_01_01 transitioned from RUNNING to KILLING 2015-09-22 09:54:36,533 INFO [AsyncDispatcher event handler] launcher.ContainerLaunch (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container container_1442908447829_0002_01_01 2015-09-22 09:54:36,661 INFO [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(408)) - Memory usage of ProcessTree 10346 for container-id container_1442908447829_0001_01_01: 70.0 MB of 256 MB physical memory used; 1.2 GB of 537.6 MB virtual memory used 2015-09-22 09:54:36,661 WARN [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:isProcessTreeOverLimit(293)) - Process tree for container: container_1442908447829_0001_01_01 running over twice the configured limit. Limit=563714432, current usage = 1269743616 2015-09-22 09:54:36,662 WARN [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(447)) - Container [pid=10346,containerID=container_1442908447829_0001_01_01] is running beyond virtual memory limits. Current usage: 70.0 MB of 256 MB physical memory used; 1.2 GB of 537.6 MB virtual memory used. Killing container. Dump of the process-tree for container_1442908447829_0001_01_01 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 10346 10344 10346 10346 (java) 253 7 1269743616 17908 /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Dsamza.container.name=samza-application-master -Dsamza.log.dir=/opt/hadoop-2.6.0/logs/userlogs/application_14429084
container is running beyond virtual memory limits
tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/jackson-core-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/jackson-databind-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/jackson-dataformat-smile-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/jackson-jaxrs-json-provider-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/jackson-module-jaxb-annotations-2.6.0.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/nxtBroker-0.0.1.jar:/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1442908447829_0001/container_1442908447829_0001_01_01/__package/lib/nxtBroker-0.0.1-jar-with-dependencies.jar org.apache.samza.job.yarn.SamzaAppMaster 2015-09-22 09:54:36,663 INFO [Container Monitor] monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(458)) - Removed ProcessTree with root 10346 2015-09-22 09:54:36,663 INFO [AsyncDispatcher event handler] container.Container (ContainerImpl.java:handle(999)) - Container container_1442908447829_0001_01_01 transitioned from RUNNING to KILLING 2015-09-22 09:54:36,663 INFO [AsyncDispatcher event handler] launcher.ContainerLaunch (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container container_1442908447829_0001_01_01 ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: process killing
01/__package/lib/nxtBroker-0.0.1-jar-with-dependencies.jar org.apache.samza.job.yarn.SamzaAppMaster As you can see the proceses are still there. In the web application the appear as KILLED. Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: martes, 22 de septiembre de 2015 9:59 Para: dev@samza.apache.org Asunto: Re: process killing Hi Jordi, 1. Are you running the job in one machine yarn? or in a cluster? 2. what kind of the java process do you see after killing the yarn application? Because usually, when we run kill-yarn-job applicationId, we do kill all the processes (this is actually done by the Yarn). 3. Which version of Samza and Yarn are you using ? This matters sometimes. Thanks, Fang, Yan yanfang...@gmail.com On Tue, Sep 22, 2015 at 3:42 PM, Jordi Blasi Uribarri wrote: > Hi, > > I am currently developing solution using samza and in the development > process I need to constantly change the code and test in the system. > What I am seeing is that most of the times I kill a job using the > kill-yarn-job script the job gets killed according to the web > interface but I see the java process running. I also have seen that > the job was actually been executed, as I got messages in the far end > of the application. I have been manually killing these processes (kill -9 ) > but I have some questions: > > > - Is there a reason for the processes not to be killed. It was > not a matter of time as I could find them hours later. > > - I don’t know if there should be any other action performed to > completely clean the information or killing the process the hard way > is enough. > > - I am finding some memory consumption problems that I don’t know > if they are related with this. Maybe I will describe them in another > message. > > Thnaks, > > Jordi > > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] >
process killing
Hi, I am currently developing solution using samza and in the development process I need to constantly change the code and test in the system. What I am seeing is that most of the times I kill a job using the kill-yarn-job script the job gets killed according to the web interface but I see the java process running. I also have seen that the job was actually been executed, as I got messages in the far end of the application. I have been manually killing these processes (kill -9 ) but I have some questions: - Is there a reason for the processes not to be killed. It was not a matter of time as I could find them hours later. - I don’t know if there should be any other action performed to completely clean the information or killing the process the hard way is enough. - I am finding some memory consumption problems that I don’t know if they are related with this. Maybe I will describe them in another message. Thnaks, Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: memory limits
I have tried changing all the jobs configuration to this: yarn.container.memory.mb=128 yarn.am.container.memory.mb=128 and on the startup I can see: 2015-09-15 12:40:18 ClientHelper [INFO] set memory request to 128 for application_1442313590092_0002 On the web interface of hadoop I see that every job is still getting 2 gb each. In fact, only two of the jobs are in state running, while the rest are accepted. Any ideas? Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 11 de septiembre de 2015 20:56 Para: dev@samza.apache.org Asunto: Re: memory limits Hi Jordi, I believe you can change the memory by* yarn.container.memory.mb* , default is 1024. And *yarn.am.container.memory.mb* is for the AM memory. See http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html Thanks, Fang, Yan yanfang...@gmail.com On Fri, Sep 11, 2015 at 4:21 AM, Jordi Blasi Uribarri wrote: > Hi, > > I am trying to implement an environment that requires multiple > combined samza jobs for different tasks. I see that there is a limit > to the number of jobs that can be running at the same time as they block 1GB > of ram each. > I understand that this is a reasonable limit in a production > environment (as long as we are speaking of Big Data, we need big > amounts of resources ☺ > ) but my lab does not have so much ram. Is there a way to reduce this > limit so I can test it properly? I am using Samza 0.9. > > Thanks in advance, > > Jordi > ____ > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] >
memory limits
Hi, I am trying to implement an environment that requires multiple combined samza jobs for different tasks. I see that there is a limit to the number of jobs that can be running at the same time as they block 1GB of ram each. I understand that this is a reasonable limit in a production environment (as long as we are speaking of Big Data, we need big amounts of resources ☺ ) but my lab does not have so much ram. Is there a way to reduce this limit so I can test it properly? I am using Samza 0.9. Thanks in advance, Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: Kill All Jobs
As I said I am new to this procedure, but I guess I have done it. There goes my first contribution to the project!!! https://issues.apache.org/jira/browse/SAMZA-756 bye, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 14 de agosto de 2015 1:04 Para: dev@samza.apache.org Asunto: Re: Kill All Jobs Hi Jordi, Thanks. This is useful. If possible, can you open a JIRA and upload the patch there? Thanks, Fang, Yan yanfang...@gmail.com On Thu, Aug 6, 2015 at 8:04 AM, Shekar Tippur wrote: > Thanks Jordi. This really helps. > > - Shekar > > On Thu, Aug 6, 2015 at 12:21 AM, Jordi Blasi Uribarri > > wrote: > > > Hi, > > > > As a little present (and I know this is not the way to get the code > > in > the > > project, but I am new to this sharing). I just made a simple script > > to > kill > > all the jobs running in Samza. It is supposed to live with > kill-yarn-job.sh > > in the bin folder. It shares me time, so maybe someone finds it helpful. > > > > > > [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export > > JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname > > $0)/log4j-console.xml" > > > > exec $(dirname $0)/run-class.sh > > org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | > > grep application_ | awk -F ' ' '{ print $1 }' | while read linea do > > $(dirname $0)/kill-yarn-job.sh $linea done > > > > Hope it helps. > > > >Bye > > > > Jordi > > > > Jordi Blasi Uribarri > > Área I+D+i > > > > jbl...@nextel.es > > Oficina Bilbao > > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] > > >
RE: Missing a change log offset for SystemStreamPartition
Thanks. You got the idea. At the moment I am going to start with the init method. The change-capture system was what I wanted to develop (but with a parallel job, no way there). The origin of the data should be a (soon to be developed) web admin. I was thinking on add and delete commands to alter de data, but I guess that if replication is an issue, I better wait, as It can be tricky (if not impossible) to manage that updates are distributed to all containers. I guess that my first problem is about semantics of Samza (yes, containers are the copies :)). Thanks for your help. Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: miércoles, 12 de agosto de 2015 1:50 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, I see your use case now. Essentially, you want to have an adjunct data bootstrap into a Samza job which will consume a stream and do a stream-table join w/ the pre-loaded adjunct data in the store. We have plenty of these kind of use case in LinkedIn. If your adjunct data set is small and static, you can simple load it in the init() method from external data sources. If your adjunct data set is big and may have updates as well, one popular set up in LinkedIn is: 1) Have a change-capture system associated w/ the external source which also being able to scan the whole data source to bootstrap 2) The change-capture system will write each record / record update into a Kafka system stream (i.e. change-capture-topic) to be consumed by the downstream Samza job 3) The downstream Samza job can be configured to bootstrap on the change-capture-topic and consume from input topic topicA. The Samza job will then simply bootstrap via consuming all messages in the change-capture-topic and updating the local KV-store, before starting consuming input topicA. The change-capture system in LinkedIn is called Databus, which scans MySQL binlogs and sends the transactions into Kafka. Martin has wrote up some PostgreSQL change-capture work here: https://issues.apache.org/jira/browse/SAMZA-212. What's your external source? Does the above sounds a solution that you are looking for? As for your last question about replicate the store to multiple containers (I assume that you meant container when stating "all copies of the job"), there is an on-going work on broadcast stream here: https://issues.apache.org/jira/browse/SAMZA-676. -Yi On Tue, Aug 11, 2015 at 1:11 PM, Jordi Blasi Uribarri wrote: > Hi, > > What I am trying to develop is (I think) an evolution of the > Stream-table join. For every message processed, depending on the value > a variable contained on it go to the store and decide to which topic it must > be sent. > It is some kind of workflow manager (very simple). I can read from a > store the data to guide the decision: key-> previous step, value-> output > topic. > > My problem is how to make that this information is already available > for the job when the process() method is called. I was trying to load > this info on a (let's call it) configuration job that receives all the > data pairs and loads to the store. As I see this is not supported so I > need another way to get this info into the store. > > I see the same problem with the Zip code example in the documentation: > how do the zip codes get to the store so they can be crossed against > the incoming messages? > > I am thinking of a solution that could be on the initialization > process read it from an external source, maybe a MySQL server and load > it to the store. Then on the process of the messages it could access > the data. Is this a correct way of doing it? > > I am not sure if I am explaining correctly what I am trying to do. > > The other question I have is, once the data is loaded to the store, is > it replicated to all the copies of the job? > > thanks, > > Jordi > > > De: Yi Pan [nickpa...@gmail.com] > Enviado: martes, 11 de agosto de 2015 19:03 > Para: dev@samza.apache.org > Asunto: Re: Missing a change log offset for SystemStreamPartition > > Hi, Jordi, > > The local KV-store is meant to be accessed by the Samza container > locally on the machine. If you were referring to the use case that the > local KV-store is accessed by a container from a different Samza job, > it is not supported. And what does it exactly mean when you say "load > the DB to be able to use it from the consuming job"? We may be of more > help if we know your use case in more details. > > Thanks! > > -Yi > > > > On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri > > wrote: > > > InitableTask tas kwas missing. That responds to another problem that > >
Re: Missing a change log offset for SystemStreamPartition
Hi, What I am trying to develop is (I think) an evolution of the Stream-table join. For every message processed, depending on the value a variable contained on it go to the store and decide to which topic it must be sent. It is some kind of workflow manager (very simple). I can read from a store the data to guide the decision: key-> previous step, value-> output topic. My problem is how to make that this information is already available for the job when the process() method is called. I was trying to load this info on a (let's call it) configuration job that receives all the data pairs and loads to the store. As I see this is not supported so I need another way to get this info into the store. I see the same problem with the Zip code example in the documentation: how do the zip codes get to the store so they can be crossed against the incoming messages? I am thinking of a solution that could be on the initialization process read it from an external source, maybe a MySQL server and load it to the store. Then on the process of the messages it could access the data. Is this a correct way of doing it? I am not sure if I am explaining correctly what I am trying to do. The other question I have is, once the data is loaded to the store, is it replicated to all the copies of the job? thanks, Jordi De: Yi Pan [nickpa...@gmail.com] Enviado: martes, 11 de agosto de 2015 19:03 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, The local KV-store is meant to be accessed by the Samza container locally on the machine. If you were referring to the use case that the local KV-store is accessed by a container from a different Samza job, it is not supported. And what does it exactly mean when you say "load the DB to be able to use it from the consuming job"? We may be of more help if we know your use case in more details. Thanks! -Yi On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri wrote: > InitableTask tas kwas missing. That responds to another problem that I was > experiencing (and left for later). Anyway the exception was still there > until I commented the changelog definition line in the properties file: > > #stores.test12db.changelog=kafka.test12db-changelog > > As I understand it in case of job going down information will be lost. > That is not a real issue as I am storing temporal information there. > > What I am seeing is something that does not work as I expected. Maybe I am > not understanding correctly how the system works. I need that a job has > access to the information previously stored in the storage. I was planning > a loader job that on receiving some messages with data it stores them in > RocksDb and that information should be consumed by a different consumer job > to use it for calculation. > > I see that in the loader job I can put and get information correctly. When > I try to access the same storage from a different job I just get null > results. > > How I am supposed to load the DB to be able to use it from the consuming > job? Is RocksDB the tool to use or should I use any other technique? > > Thanks, > > Jordi > > > > -Mensaje original- > De: Yi Pan [mailto:nickpa...@gmail.com] > Enviado el: martes, 11 de agosto de 2015 3:27 > Para: dev@samza.apache.org > Asunto: Re: Missing a change log offset for SystemStreamPartition > > Hi, Jordi, > > Agree with Yan. More specifically, your class definition should be > something like: > {code} > public class testStore implements StreamTask, InitableTask { ... > } > {code} > > On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang wrote: > > > Hi Jordi, > > > > I think, you need to implement the *InitableTask* interface. > > Otherwise, the content in the init method will not be processed. > > > > Thanks, > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri > > > > wrote: > > > > > Just for making it easier to reproduce the problem I just reduced > > > the > > code > > > of the job to the minimum: > > > > > > package test; > > > > > > import org.apache.samza.config.Config; import > > > org.apache.samza.storage.kv.KeyValueStore; > > > import org.apache.samza.system.IncomingMessageEnvelope; > > > import org.apache.samza.task.MessageCollector; > > > import org.apache.samza.task.StreamTask; import > > > org.apache.samza.task.TaskContext; > > > import org.apache.samza.task.TaskCoordinator; > > > > > > public class testStore implements StreamTask { > > > private KeyValueStore s
RE: Missing a change log offset for SystemStreamPartition
InitableTask tas kwas missing. That responds to another problem that I was experiencing (and left for later). Anyway the exception was still there until I commented the changelog definition line in the properties file: #stores.test12db.changelog=kafka.test12db-changelog As I understand it in case of job going down information will be lost. That is not a real issue as I am storing temporal information there. What I am seeing is something that does not work as I expected. Maybe I am not understanding correctly how the system works. I need that a job has access to the information previously stored in the storage. I was planning a loader job that on receiving some messages with data it stores them in RocksDb and that information should be consumed by a different consumer job to use it for calculation. I see that in the loader job I can put and get information correctly. When I try to access the same storage from a different job I just get null results. How I am supposed to load the DB to be able to use it from the consuming job? Is RocksDB the tool to use or should I use any other technique? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: martes, 11 de agosto de 2015 3:27 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, Agree with Yan. More specifically, your class definition should be something like: {code} public class testStore implements StreamTask, InitableTask { ... } {code} On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang wrote: > Hi Jordi, > > I think, you need to implement the *InitableTask* interface. > Otherwise, the content in the init method will not be processed. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri > > wrote: > > > Just for making it easier to reproduce the problem I just reduced > > the > code > > of the job to the minimum: > > > > package test; > > > > import org.apache.samza.config.Config; import > > org.apache.samza.storage.kv.KeyValueStore; > > import org.apache.samza.system.IncomingMessageEnvelope; > > import org.apache.samza.task.MessageCollector; > > import org.apache.samza.task.StreamTask; import > > org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > > > public class testStore implements StreamTask { > > private KeyValueStore storestp; > > > > public void init(Config config, TaskContext context) { > > this.storestp = (KeyValueStore) > > context.getStore("test11db"); > > } > > > > public void process (IncomingMessageEnvelope envelope, > >MessageCollector collector, > >TaskCoordinator coordinator) > > { > > String msgin = (String) envelope.getMessage(); > > storestp.put("test1",msgin); > > } > > } > > > > The properties file contains this: > > > > task.class=test.testStore > > job.name=test.testStore > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar. > > gz > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory > > > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02: > 2181 > > > > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01: > 9093,kfk-kafka02:9092,kfk-kafka02:9093 > > > > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka > 01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > > > # Declare that we want our job's checkpoints to be written to Kafka > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo > intManagerFactory > > task.checkpoint.system=kafka > > > > # The job consumes a topic called "configtpc" from the "kafka" > > system task.inputs=kafka.configtpc > > > > # Define a serializer/deserializer called "json" which parses JSON > messages > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory > > > > # Serializer for the system > > systems.kafka.samza.msg.serde=string > > systems.kafka.streams.tracetpc.samza.msg.serde=json > > > > # Use the key-value store implementation for a store called "my-store"
RE: Missing a change log offset for SystemStreamPartition
at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) The job fails even when there is no message sent to the input topic. Samza is version 0.9.1 and kafka 0.8.2. Thanks, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: lunes, 10 de agosto de 2015 10:26 Para: dev@samza.apache.org Asunto: RE: Missing a change log offset for SystemStreamPartition Hi, I have migrated samza to the last versión and recreated the job with a new store name so the streams were created clean. I am getting the same error: java version "1.7.0_79" OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders could be found for logger (org.apache.samza.metrics.JmxServer). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, commdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Is there any other info I can attach to help find the problem? Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 de agosto de 2015 23:21 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi Jordi, Sorry for getting you back late. Was quite busy yesterday. I think the reason of your error
RE: Missing a change log offset for SystemStreamPartition
Hi, I have migrated samza to the last versión and recreated the job with a new store name so the streams were created clean. I am getting the same error: java version "1.7.0_79" OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders could be found for logger (org.apache.samza.metrics.JmxServer). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, commdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Is there any other info I can attach to help find the problem? Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 de agosto de 2015 23:21 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi Jordi, Sorry for getting you back late. Was quite busy yesterday. I think the reason of your error is that you mismatched Samza version and Kafka version. Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not supported. So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*. This match is proved working. Hope this helps you. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri wrote: > I changed the job name and the store name. I was defining two > different stores and in case that was the problem, I also eliminated the > second one. > I am getting the same exception. > > Exception in thread "main" org.apache.samza.SamzaException: Missing a > change log offset for SystemStreamPartition [kafka, testdb-changelog, 2]. > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.a
RE: Missing a change log offset for SystemStreamPartition
I changed the job name and the store name. I was defining two different stores and in case that was the problem, I also eliminated the second one. I am getting the same exception. Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, testdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) As I have the autocreate configured in Kafka I am not creating anything for the store. Is that ok? By the way, is there any problem on having two different stores? Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: miércoles, 05 de agosto de 2015 20:23 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi Jordi, I wonder, the reason of your first exception is that, you changed the task number (partition number of your input stream), but still were using the same changelog stream. It is trying to send to the partition 2, which does not exist? Can you reproduce this exception in a new job? (new store name, new job name) The second exception is caused by the wrong offset format, I believe. Let me know how the new job goes. Thanks, Fang, Yan yanfang...@gmail.com On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri wrote: > Hi, > > I am trying to use the Keystore to manage some state information. > Basically this is the code I am using. As long as I have tested, the > rest is working correctly. > > private KeyValueStore storestp; > > public void init(Config config, TaskContext context) { > this.storestp = (KeyValueStore) > context.getStore("stepdb"); >} > >public void process(IncomingMessageEnvelope envelope, > MessageCollector collector, > TaskCoordinator coordinator) > { >… > String str = storestp.get(code) > … > } > > When I load it, it goes to running but, whe I send the messages > through Kafka stream It goes to Failed state. I have found this Exception: > Exception in thread "main" org.apache.samza.SamzaException: Missing a > change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2]. > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at > org.apac
Kill All Jobs
Hi, As a little present (and I know this is not the way to get the code in the project, but I am new to this sharing). I just made a simple script to kill all the jobs running in Samza. It is supposed to live with kill-yarn-job.sh in the bin folder. It shares me time, so maybe someone finds it helpful. [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" exec $(dirname $0)/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep application_ | awk -F ' ' '{ print $1 }' | while read linea do $(dirname $0)/kill-yarn-job.sh $linea done Hope it helps. Bye Jordi ____ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
Missing a change log offset for SystemStreamPartition
ersableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Is there something wrong? Thanks, Jordi ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: testThreadInterruptInOperationSleep on clean installation
It has compiled perfectly now. Thanks for your help. Jordi -Mensaje original- De: Navina Ramesh [mailto:nram...@linkedin.com.INVALID] Enviado el: martes, 04 de agosto de 2015 9:07 Para: dev@samza.apache.org Asunto: Re: testThreadInterruptInOperationSleep on clean installation Hi Jordi, Looks like you are hitting this error: org.apache.samza.storage.kv.TestRocksDbKeyValueStore > testTTL STANDARD_ERROR SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. */usr/lib/jvm/java-7-openjdk-amd64/bin/java: symbol lookup error: /tmp/librocksdbjni1871304830042440508..so: undefined symbol: clock_gettime* This is a known issue when building Samza on Linux and is being tracked in SAMZA-747 <https://issues.apache.org/jira/browse/SAMZA-747>. This is blocked on the RocksDB 3.11.1 release and hence, a blocker for the Samza 0.10 release as well. If the build is just failing on this test, try to comment out testTTL in TestRocksDBKeyValueStore.scala and give it a try. I hope it solves your build issues. Thanks! Navina On Mon, Aug 3, 2015 at 11:53 PM, Jordi Blasi Uribarri wrote: > Hi, > > There was no gradle process and I have tried to run it increasing the > log level. It seems that the test that is failing is the one related to > Rocksdb. > > How can I run the test on eclipse? > > This is what I get: > > Executing task ':samza-kv-rocksdb_2.10:sourcesJar' (up-to-date check > took > 0.005 secs) due to: > Output file > /opt/samza/samza-kv-rocksdb/build/libs/samza-kv-rocksdb_2.10-0.10.0-SN > APSHOT-sources.jar > has changed. > Output file > /opt/samza/samza-kv-rocksdb/build/libs/samza-kv-rocksdb_2.10-0.10.0-SN > APSHOT-sources.jar > has been removed. > :samza-kv-rocksdb_2.10:sourcesJar (Thread[Daemon,5,main]) completed. > Took > 0.019 secs. > :samza-kv-rocksdb_2.10:signArchives (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:signArchives > Skipping task ':samza-kv-rocksdb_2.10:signArchives' as task onlyIf is > false. > :samza-kv-rocksdb_2.10:signArchives SKIPPED > :samza-kv-rocksdb_2.10:signArchives (Thread[Daemon,5,main]) completed. > Took 0.003 secs. > :samza-kv-rocksdb_2.10:assemble (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:assemble > Skipping task ':samza-kv-rocksdb_2.10:assemble' as it has no actions. > :samza-kv-rocksdb_2.10:assemble (Thread[Daemon,5,main]) completed. > Took > 0.001 secs. > :samza-kv-rocksdb_2.10:compileTestJava (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:compileTestJava > Skipping task ':samza-kv-rocksdb_2.10:compileTestJava' as it has no > source files. > :samza-kv-rocksdb_2.10:compileTestJava UP-TO-DATE > :samza-kv-rocksdb_2.10:compileTestJava (Thread[Daemon,5,main]) completed. > Took 0.001 secs. > :samza-kv-rocksdb_2.10:compileTestScala (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:compileTestScala > Executing task ':samza-kv-rocksdb_2.10:compileTestScala' (up-to-date > check took 0.068 secs) due to: > Output file /opt/samza/samza-kv-rocksdb/build/classes/test has changed. > Output file > /opt/samza/samza-kv-rocksdb/build/classes/test/org/apache/samza/storag > e/kv/TestRocksDbKeyValueStore.class > has been removed. > Output file > /opt/samza/samza-kv-rocksdb/build/classes/test/org/apache/samza/storag > e/kv/TestRocksDbKeyValueStore$$anonfun$testTTL$2.class > has been removed. > Compiling with Ant scalac task. > [ant:scalac] Compiling 1 source file to > /opt/samza/samza-kv-rocksdb/build/classes/test > [ant:scalac] Element '/opt/samza/samza-kv-rocksdb/build/resources/main' > does not exist. > :samza-kv-rocksdb_2.10:compileTestScala (Thread[Daemon,5,main]) completed. > Took 1.158 secs. > :samza-kv-rocksdb_2.10:processTestResources (Thread[Daemon,5,main]) > started. > :samza-kv-rocksdb_2.10:processTestResources > Skipping task ':samza-kv-rocksdb_2.10:processTestResources' as it has > no source files. > :samza-kv-rocksdb_2.10:processTestResources UP-TO-DATE > :samza-kv-rocksdb_2.10:processTestResources (Thread[Daemon,5,main]) > completed. Took 0.003 secs. > :samza-kv-rocksdb_2.10:testClasses (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:testClasses > Skipping task ':samza-kv-rocksdb_2.10:testClasses' as it has no actions. > :samza-kv-rocksdb_2.10:testClasses (Thread[Daemon,5,main]) completed. > Took > 0.001 secs. > :samza-kv-rocksdb_2.10:test (Thread[Daemon,5,main]) started. > :samza-kv-rocksdb_2.10:test > Executing task ':samza-kv-rocksdb_2.10:test' (up-to-date check took &g
log4j configuration
Hi, I guess this is just a howto question, but I am not able to find how it works. I am trying to trace the code of the job I want to execute in Samza. I have defined the environment variable as stated in the documentation: export SAMZA_LOG_DIR=/opt/logs I believe that this is working as I have seen that the garbage collector correctly generates the gc.o file and writes to it. The directories exists in all samza servers (two in my lab). I have added the following code to the job: Logger logger = Logger.getLogger(GetConfig.class); logger.info("Trace text message"); Although the code is being executed (I can see the messages going through) I see no trace written. I don’t have experience with log4j and maybe it is there where I have to look but, I am missing something? Thanks, Jordi ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
RE: testThreadInterruptInOperationSleep on clean installation
1) kill all the GradleDaemon and GradleWrapperMain processes when you rerun the build ? 2) can you try to run those two tests in the eclipse (or some other ways) without gradle ? I doubt both are related to the gradle. Thanks, Fang, Yan yanfang...@gmail.com On Sun, Aug 2, 2015 at 11:45 PM, Jordi Blasi Uribarri wrote: > Hi, > > I am trying to do a clean installation of Samza on a newly installed > Debian 7.8 box. Following the stpes I collected in a previous 0.8.2 > Samza installation I have performed the following steps: > > apt-get install openjdk-7-jdk openjdk-7-jre git maven curl vi > /root/.bashrc export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 > export CLASSPATH=$CLASSPATH:/usr/share/java > > cd /opt > git clone http://git-wip-us.apache.org/repos/asf/samza.git > cd samza > ./gradlew clean build > > > Every time I run it I get an error on the test the script runs: > testThreadInterruptInOperationSleep > va.lang.AssertionError: expected:<1> but was:<0> >at org.junit.Assert.fail(Assert.java:91) >at org.junit.Assert.failNotEquals(Assert.java:645) >at org.junit.Assert.assertEquals(Assert.java:126) >at org.junit.Assert.assertEquals(Assert.java:470) >at org.junit.Assert.assertEquals(Assert.java:454) >at > org.apache.samza.util.TestExponentialSleepStrategy.testThreadInterruptInOperationSleep(TestExponentialSleepStrategy.scala:158) >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.lang.reflect.Method.invoke(Method.java:606) >at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) >at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) >at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) >at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) >at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76) >at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) >at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) >at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) >at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) >at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) >at org.junit.runners.ParentRunner.run(ParentRunner.java:236) >at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86) >at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49) >at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69) >at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48) >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.lang.reflect.Method.invoke(Method.java:606) >at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) >at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) >at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) >at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105) >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.lang.reflect.Method.invoke(Method.java:606) >at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(
Dinamically asigning output topic
Hi, Is there a way to assign dynamically in the job code the topic where the output messages should be sent to? I am testing an idea that needs to decide programmatically (and not in from limited range) the destination topic. As I am understanding it, I need to declare the output format in the job properties file, so I need to know the topic name. Is there a way to free from this dependency? Thanks. Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
testThreadInterruptInOperationSleep on clean installation
PoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: empty.txt (No existe el fichero o el directorio) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:241) at java.io.RandomAccessFile.(RandomAccessFile.java:122) at org.apache.samza.system.filereader.FileReaderSystemConsumer$$anon$1.run(FileReaderSystemConsumer.scala:121) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Exception in thread "SAMZA-filereader-" java.lang.Error: java.io.FileNotFoundException: twoEnter.txt (No existe el fichero o el directorio) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: twoEnter.txt (No existe el fichero o el directorio) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:241) at java.io.RandomAccessFile.(RandomAccessFile.java:122) at org.apache.samza.system.filereader.FileReaderSystemConsumer$$anon$1.run(FileReaderSystemConsumer.scala:121) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Exception in thread "SAMZA-filereader-" java.lang.Error: java.io.FileNotFoundException: noEnter.txt (No existe el fichero o el directorio) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: noEnter.txt (No existe el fichero o el directorio) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:241) at java.io.RandomAccessFile.(RandomAccessFile.java:122) at org.apache.samza.system.filereader.FileReaderSystemConsumer$$anon$1.run(FileReaderSystemConsumer.scala:121) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Exception in thread "SAMZA-filereader-" java.lang.Error: java.io.FileNotFoundException: moreEnter.txt (No existe el fichero o el directorio) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: moreEnter.txt (No existe el fichero o el directorio) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:241) at java.io.RandomAccessFile.(RandomAccessFile.java:122) at org.apache.samza.system.filereader.FileReaderSystemConsumer$$anon$1.run(FileReaderSystemConsumer.scala:121) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Exception in thread "SAMZA-filereader-" java.lang.Error: java.io.FileNotFoundException: oneEnter.txt (No existe el fichero o el directorio) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: oneEnter.txt (No existe el fichero o el directorio) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.(RandomAccessFile.java:241) at java.io.RandomAccessFile.(RandomAccessFile.java:122) at org.apache.samza.system.filereader.FileReaderSystemConsumer$$anon$1.run(FileReaderSystemConsumer.scala:121) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more What I am missing? Thanks in advance. Bye Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
Samza install quick guide
Hi, I am not sure what is the correct way of doing this (I am sure this is not it, but anyway…). Following your advice I was able to get working Samza and now I am trying to explore it’s full capabilities. As promised, the notes, quick installation guide, I was writing in the process is in the pastebin link I include here. I hope it helps next newbie that comes to Samza like me. http://pastebin.com/XDDhi29k I don’t know how long will they keep it there, so I recommend that if you find it interesting, you download and include it somewhere accessible. Bye, Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
RE: java.lang.NoClassDefFoundError on Yarn job
I thought it was working, but not really. The job runs and I can see it on the web admin. But when it has to process a message it fails, It goes down and I get this exception: Application application_1427403490569_0002 failed 2 times due to AM Container for appattempt_1427403490569_0002_02 exited with exitCode: -1000 For more detailed output, check application tracking page:http://samza01:8088/proxy/application_1427403490569_0002/Then, click on links to logs of each attempt. Diagnostics: No FileSystem for scheme: http java.io.IOException: No FileSystem for scheme: http at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:249) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Failing this attempt. Failing the application. What I am doing wrong? Thanks, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: viernes, 27 de marzo de 2015 9:45 Para: dev@samza.apache.org Asunto: RE: java.lang.NoClassDefFoundError on Yarn job Solved. My aplication was using 0.9.0 version of yarn. When downgraded to 0.8.0 it worked. Thanks, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: viernes, 27 de marzo de 2015 9:05 Para: dev@samza.apache.org Asunto: RE: java.lang.NoClassDefFoundError on Yarn job I did the steps that were included in the case and I am getting the same error. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.samza.job.JobRunner.run(JobRunner.scala:56) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 5 more The only difference I see is that I downloaded hadoop 2.6 instead of 2.4. Is the version mandatory? Should I downgrade? Thanks, Jordi -Mensaje original- De: Roger Hoover [mailto:roger.hoo...@gmail.com] Enviado el: jueves, 26 de marzo de 2015 17:25 Para: dev@samza.apache.org Asunto: Re: java.lang.NoClassDefFoundError on Yarn job Hi Jordi, You might be running into this issue ( https://issues.apache.org/jira/browse/SAMZA-456) which I just hit as well. You probably need to add a couple more jars to your YARN lib dir. Cheers, Roger On Thu, Mar 26, 2015 at 9:21 AM, Jordi Blasi Uribarri wrote: > Hi: > > I got samza running a job in local mode with the property: > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > > Now I am trying to get it running in multiple machines. I have > followed the steps in the following guide: > > > https://github.com/apache/samza/blob/master/docs/learn/tutorials/versi > oned/run-in-multi-node-yarn.md > > I see the node up and running. > > I have created a tar.gz file with the contents of the bin and lib > folders that were running locally Yarn and published it in a local > Apache2 we
RE: java.lang.NoClassDefFoundError on Yarn job
Solved. My aplication was using 0.9.0 version of yarn. When downgraded to 0.8.0 it worked. Thanks, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: viernes, 27 de marzo de 2015 9:05 Para: dev@samza.apache.org Asunto: RE: java.lang.NoClassDefFoundError on Yarn job I did the steps that were included in the case and I am getting the same error. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.samza.job.JobRunner.run(JobRunner.scala:56) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 5 more The only difference I see is that I downloaded hadoop 2.6 instead of 2.4. Is the version mandatory? Should I downgrade? Thanks, Jordi -Mensaje original- De: Roger Hoover [mailto:roger.hoo...@gmail.com] Enviado el: jueves, 26 de marzo de 2015 17:25 Para: dev@samza.apache.org Asunto: Re: java.lang.NoClassDefFoundError on Yarn job Hi Jordi, You might be running into this issue ( https://issues.apache.org/jira/browse/SAMZA-456) which I just hit as well. You probably need to add a couple more jars to your YARN lib dir. Cheers, Roger On Thu, Mar 26, 2015 at 9:21 AM, Jordi Blasi Uribarri wrote: > Hi: > > I got samza running a job in local mode with the property: > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > > Now I am trying to get it running in multiple machines. I have > followed the steps in the following guide: > > > https://github.com/apache/samza/blob/master/docs/learn/tutorials/versi > oned/run-in-multi-node-yarn.md > > I see the node up and running. > > I have created a tar.gz file with the contents of the bin and lib > folders that were running locally Yarn and published it in a local > Apache2 web server. The properties file looks like this: > > task.class=samzafroga.job1 > job.name=samzafroga.job1 > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > yarn.package.path= http://192.168.15.92/jobs/samzajob1.tar.gz > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory systems.kafka.consumer.zookeeper.connect= broker01:2181 > systems.kafka.producer.bootstrap.servers= broker01:9092 > > task.inputs=kafka.syslog > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory systems.kafka.streams.syslog.samza.msg.serde=string > systems.kafka.streams.samzaout.samza.msg.serde=string > > When I run the same command that was working in the local mode: > bin/run-job.sh > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file://$PWD/job1.properties > > I see the following exception: > java version "1.7.0_75" > OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK > 64-Bit Server VM (build 24.75-b04, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies. > jar > org.apache.samza.job.JobRunner > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file:///opt/jobs/job1.properties > log4j: reset attribute= "false". > log4j: Threshold ="null". > log4j: Level value for root is [INFO]. > log4j: root level set to INFO > log4j: Class name: [org.apache.log4j.ConsoleAppender] > log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" > log4j: Setting property [conversionPattern] to [%d{dd MMM > HH:mm:ss} %5p %c{1} - %m%n]. > log4j: Adding appender named [consoleAppender] to category [root]. > log4j: Class name: [org.apache.log4j.RollingFileAppender] > log4j: Setting property [app
RE: java.lang.NoClassDefFoundError on Yarn job
I did the steps that were included in the case and I am getting the same error. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.samza.job.JobRunner.run(JobRunner.scala:56) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 5 more The only difference I see is that I downloaded hadoop 2.6 instead of 2.4. Is the version mandatory? Should I downgrade? Thanks, Jordi -Mensaje original- De: Roger Hoover [mailto:roger.hoo...@gmail.com] Enviado el: jueves, 26 de marzo de 2015 17:25 Para: dev@samza.apache.org Asunto: Re: java.lang.NoClassDefFoundError on Yarn job Hi Jordi, You might be running into this issue ( https://issues.apache.org/jira/browse/SAMZA-456) which I just hit as well. You probably need to add a couple more jars to your YARN lib dir. Cheers, Roger On Thu, Mar 26, 2015 at 9:21 AM, Jordi Blasi Uribarri wrote: > Hi: > > I got samza running a job in local mode with the property: > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > > Now I am trying to get it running in multiple machines. I have > followed the steps in the following guide: > > > https://github.com/apache/samza/blob/master/docs/learn/tutorials/versi > oned/run-in-multi-node-yarn.md > > I see the node up and running. > > I have created a tar.gz file with the contents of the bin and lib > folders that were running locally Yarn and published it in a local > Apache2 web server. The properties file looks like this: > > task.class=samzafroga.job1 > job.name=samzafroga.job1 > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > yarn.package.path= http://192.168.15.92/jobs/samzajob1.tar.gz > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory systems.kafka.consumer.zookeeper.connect= broker01:2181 > systems.kafka.producer.bootstrap.servers= broker01:9092 > > task.inputs=kafka.syslog > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory systems.kafka.streams.syslog.samza.msg.serde=string > systems.kafka.streams.samzaout.samza.msg.serde=string > > When I run the same command that was working in the local mode: > bin/run-job.sh > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file://$PWD/job1.properties > > I see the following exception: > java version "1.7.0_75" > OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK > 64-Bit Server VM (build 24.75-b04, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies. > jar > org.apache.samza.job.JobRunner > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file:///opt/jobs/job1.properties > log4j: reset attribute= "false". > log4j: Threshold ="null". > log4j: Level value for root is [INFO]. > log4j: root level set to INFO > log4j: Class name: [org.apache.log4j.ConsoleAppender] > log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" > log4j: Setting property [conversionPattern] to [%d{dd MMM > HH:mm:ss} %5p %c{1} - %m%n]. > log4j: Adding appender named [consoleAppender] to category [root]. > log4j: Class name: [org.apache.log4j.RollingFileAppender] > log4j: Setting property [append] to [false]. > log4j: Setting property [file] to [out/learning.log]. > log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" > log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p > [%c{1}] %m%n]. > log4j: setFile called: out/learning.log, false > log4j: se
java.lang.NoClassDefFoundError on Yarn job
Hi: I got samza running a job in local mode with the property: job.factory.class=org.apache.samza.job.local.ThreadJobFactory Now I am trying to get it running in multiple machines. I have followed the steps in the following guide: https://github.com/apache/samza/blob/master/docs/learn/tutorials/versioned/run-in-multi-node-yarn.md I see the node up and running. I have created a tar.gz file with the contents of the bin and lib folders that were running locally Yarn and published it in a local Apache2 web server. The properties file looks like this: task.class=samzafroga.job1 job.name=samzafroga.job1 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory yarn.package.path= http://192.168.15.92/jobs/samzajob1.tar.gz systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect= broker01:2181 systems.kafka.producer.bootstrap.servers= broker01:9092 task.inputs=kafka.syslog serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory systems.kafka.streams.syslog.samza.msg.serde=string systems.kafka.streams.samzaout.samza.msg.serde=string When I run the same command that was working in the local mode: bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/job1.properties I see the following exception: java version "1.7.0_75" OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode) /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Dlog4j.configuration=file:bin/log4j-console.xml -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-jar-with-dependencies.jar org.apache.samza.job.JobRunner --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///opt/jobs/job1.properties log4j: reset attribute= "false". log4j: Threshold ="null". log4j: Level value for root is [INFO]. log4j: root level set to INFO log4j: Class name: [org.apache.log4j.ConsoleAppender] log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" log4j: Setting property [conversionPattern] to [%d{dd MMM HH:mm:ss} %5p %c{1} - %m%n]. log4j: Adding appender named [consoleAppender] to category [root]. log4j: Class name: [org.apache.log4j.RollingFileAppender] log4j: Setting property [append] to [false]. log4j: Setting property [file] to [out/learning.log]. log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p [%c{1}] %m%n]. log4j: setFile called: out/learning.log, false log4j: setFile ended log4j: Adding appender named [fileAppender] to category [root]. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.samza.job.JobRunner.run(JobRunner.scala:56) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 5 more I guess there is a problem with the job package but I am not sure how to solve it. Thanks, Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
RE: cannot be cast to java.lang.String
Got it!!! It was all about (as Chinmay pointed) defining correctly the serializer: serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory systems.kafka.streams.syslog.samza.msg.serde=string systems.kafka.streams.samzaout.samza.msg.serde=string Now going for the next step. Thanks a lot, Jordi -Mensaje original- De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: jueves, 26 de marzo de 2015 9:21 Para: dev@samza.apache.org Asunto: RE: cannot be cast to java.lang.String I am not sure of understanding what you want to say. Does this mean that the kafka producer is not valid? I should test a producer specifically made for Samza or is it a configuration issue? I have working producers and they are sending strings in different data structures, but they all end being sent as Strings through the Kafka API. (props.put("serializer.class", "kafka.serializer.StringEncoder");). I hava had the same issue with this code. Thanks, Jordi -Mensaje original- De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] Enviado el: miércoles, 25 de marzo de 2015 16:59 Para: dev@samza.apache.org Asunto: Re: cannot be cast to java.lang.String I think this is a bit specific to Samza. In the KafkaSystemProducer class, it does something like this: envelope.getMessage.asInstanceOf[Array[Byte]] and not just 'byte[]'. This is why we need to be explicit about the serialization format. On Wed, Mar 25, 2015 at 3:14 AM, Jordi Blasi Uribarri wrote: > I am using the Kafka command line producer, so I understand that I am > sending a String. > > bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic syslog > > What is actually the difference between a string and a json? Is it > just a matter of deserialization or is there any kind of metadata > included that specifies the contest type? > > How do I enable the debug mode? > > Thanks, > > Jordi > > -Mensaje original- > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > Enviado el: lunes, 23 de marzo de 2015 17:55 > Para: dev@samza.apache.org > Asunto: Re: cannot be cast to java.lang.String > > Hey Jordi, > > This is because you're sending String and not json in your output topic. > Try setting string on the output stream as well (if you haven't already). > > If you have done that - then please enable debug mode and attach the > log somewhere so that we can take a look. > > On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri > > wrote: > > > Looks like that was one error. > > > > I have set the property like this: > > systems.kafka.streams.syslog.samza.msg.serde=string > > > > But I am still getting the same error. Now I am seeing a different > > thing in the log previous to the exception: > > > > 23 mar 2015 05:49:31 INFO KafkaSystemProducer - Creating a new > > producer for system kafka. > > 23 mar 2015 05:49:31 INFO ProducerConfig - ProducerConfig values: > > value.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > key.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > block.on.buffer.full = true > > retry.backoff.ms = 100 > > buffer.memory = 33554432 > > batch.size = 16384 > > metrics.sample.window.ms = 3 > > metadata.max.age.ms = 30 > > receive.buffer.bytes = 32768 > > timeout.ms = 3 > > max.in.flight.requests.per.connection = 1 > > bootstrap.servers = [broker01:9092] > > metric.reporters = [] > > client.id = samza_producer-samzafroga_job1-1-1427086163149-3 > > compression.type = none > > retries = 2147483647 > > max.request.size = 1048576 > > send.buffer.bytes = 131072 > > acks = 1 > > reconnect.backoff.ms = 10 > > linger.ms = 0 > > metrics.num.samples = 2 > > metadata.fetch.timeout.ms = 6 > > > > 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in > > process loop. > > java.lang.ClassCastException: java.lang.String cannot be cast to [B > > at > > > org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProd > ucer.scala:80) > > at > > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) > > at > > > org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector > .scala:61) > > at samzafroga.job1.process(job1.java:21) > > at > > > org.apache.samza.contain
RE: cannot be cast to java.lang.String
I am not sure of understanding what you want to say. Does this mean that the kafka producer is not valid? I should test a producer specifically made for Samza or is it a configuration issue? I have working producers and they are sending strings in different data structures, but they all end being sent as Strings through the Kafka API. (props.put("serializer.class", "kafka.serializer.StringEncoder");). I hava had the same issue with this code. Thanks, Jordi -Mensaje original- De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] Enviado el: miércoles, 25 de marzo de 2015 16:59 Para: dev@samza.apache.org Asunto: Re: cannot be cast to java.lang.String I think this is a bit specific to Samza. In the KafkaSystemProducer class, it does something like this: envelope.getMessage.asInstanceOf[Array[Byte]] and not just 'byte[]'. This is why we need to be explicit about the serialization format. On Wed, Mar 25, 2015 at 3:14 AM, Jordi Blasi Uribarri wrote: > I am using the Kafka command line producer, so I understand that I am > sending a String. > > bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic syslog > > What is actually the difference between a string and a json? Is it > just a matter of deserialization or is there any kind of metadata > included that specifies the contest type? > > How do I enable the debug mode? > > Thanks, > > Jordi > > -Mensaje original- > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > Enviado el: lunes, 23 de marzo de 2015 17:55 > Para: dev@samza.apache.org > Asunto: Re: cannot be cast to java.lang.String > > Hey Jordi, > > This is because you're sending String and not json in your output topic. > Try setting string on the output stream as well (if you haven't already). > > If you have done that - then please enable debug mode and attach the > log somewhere so that we can take a look. > > On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri > > wrote: > > > Looks like that was one error. > > > > I have set the property like this: > > systems.kafka.streams.syslog.samza.msg.serde=string > > > > But I am still getting the same error. Now I am seeing a different > > thing in the log previous to the exception: > > > > 23 mar 2015 05:49:31 INFO KafkaSystemProducer - Creating a new > > producer for system kafka. > > 23 mar 2015 05:49:31 INFO ProducerConfig - ProducerConfig values: > > value.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > key.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > block.on.buffer.full = true > > retry.backoff.ms = 100 > > buffer.memory = 33554432 > > batch.size = 16384 > > metrics.sample.window.ms = 3 > > metadata.max.age.ms = 30 > > receive.buffer.bytes = 32768 > > timeout.ms = 3 > > max.in.flight.requests.per.connection = 1 > > bootstrap.servers = [broker01:9092] > > metric.reporters = [] > > client.id = samza_producer-samzafroga_job1-1-1427086163149-3 > > compression.type = none > > retries = 2147483647 > > max.request.size = 1048576 > > send.buffer.bytes = 131072 > > acks = 1 > > reconnect.backoff.ms = 10 > > linger.ms = 0 > > metrics.num.samples = 2 > > metadata.fetch.timeout.ms = 6 > > > > 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in > > process loop. > > java.lang.ClassCastException: java.lang.String cannot be cast to [B > > at > > > org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProd > ucer.scala:80) > > at > > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) > > at > > > org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector > .scala:61) > > at samzafroga.job1.process(job1.java:21) > > at > > > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$s > p(TaskInstance.scala:129) > > at > > > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(Ta > skInstanceExceptionHandler.scala:54) > > at > > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > > at > > > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc > ala:114) > > at > > org.apache.samza.util.TimerUtils$class.upda
RE: cannot be cast to java.lang.String
I am using the Kafka command line producer, so I understand that I am sending a String. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic syslog What is actually the difference between a string and a json? Is it just a matter of deserialization or is there any kind of metadata included that specifies the contest type? How do I enable the debug mode? Thanks, Jordi -Mensaje original- De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] Enviado el: lunes, 23 de marzo de 2015 17:55 Para: dev@samza.apache.org Asunto: Re: cannot be cast to java.lang.String Hey Jordi, This is because you're sending String and not json in your output topic. Try setting string on the output stream as well (if you haven't already). If you have done that - then please enable debug mode and attach the log somewhere so that we can take a look. On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri wrote: > Looks like that was one error. > > I have set the property like this: > systems.kafka.streams.syslog.samza.msg.serde=string > > But I am still getting the same error. Now I am seeing a different > thing in the log previous to the exception: > > 23 mar 2015 05:49:31 INFO KafkaSystemProducer - Creating a new > producer for system kafka. > 23 mar 2015 05:49:31 INFO ProducerConfig - ProducerConfig values: > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > block.on.buffer.full = true > retry.backoff.ms = 100 > buffer.memory = 33554432 > batch.size = 16384 > metrics.sample.window.ms = 3 > metadata.max.age.ms = 30 > receive.buffer.bytes = 32768 > timeout.ms = 3 > max.in.flight.requests.per.connection = 1 > bootstrap.servers = [broker01:9092] > metric.reporters = [] > client.id = samza_producer-samzafroga_job1-1-1427086163149-3 > compression.type = none > retries = 2147483647 > max.request.size = 1048576 > send.buffer.bytes = 131072 > acks = 1 > reconnect.backoff.ms = 10 > linger.ms = 0 > metrics.num.samples = 2 > metadata.fetch.timeout.ms = 6 > > 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in > process loop. > java.lang.ClassCastException: java.lang.String cannot be cast to [B > at > org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:80) > at > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) > at > org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) > at samzafroga.job1.process(job1.java:21) > at > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) > at > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > at > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > at > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) > at > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > Thanks. > > Jordi > -Mensaje original- > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > Enviado el: lunes, 23 de marzo de 2015 17:36 > Para: dev@samza.apache.org > Asunto: Re: cannot be cast to java.lang.String > > Have you tried setting this : > > systems.kafka.streams.syslog.samza.msg.serde=string // And assuming > you've defined a 'string' serializer in your config > > OR > > systems.kafka.streams.syslog.samza.msg.serde=json // Depending on the > corresponding format of your input data > > On Mon, Mar 23, 2015 at 9:24 AM, Jordi Blasi Uribarri > > wrote: > > > Hi, > > > > As I understand it, I am setting "kafka" as the system name, "beste" > > as the output topic in the system and "syslog" as the input topic. > > Both topics syslog and beste are working correctly as I am streaming > > some syslogs to the "syslog" topic and I am testing "beste" with an
Re: Tags for Samza in stack overflow
I tried to do it in the question you (kindly) answered and found that same problem. I only have 150 reputation (I have some distance yet also :) ). I don't really know if there is a way to do it collectively or you should address Stackoverflow admins as project "owners" and ask for a tag. What I should recommend, as a new user in a learning process, is to clarify the way of getting help. Maybe an specific page in the documentation specifying that this list is a good place to search and ask for problem solution. Also I missed an intermediate guide betwen Hello-Samza and the documentation with ful range of options and API. When I get Samza fully working I will make a little starting guide available with all the steps I followed to get it working. As I believe it is an easy process but I lack the references to do it, I hope it will make it easier for the rest with it. best regards, Jordi De: Navina Ramesh [nram...@linkedin.com.INVALID] Enviado: martes, 24 de marzo de 2015 3:40 Para: dev@samza.apache.org Asunto: Tags for Samza in stack overflow Hi all, I think it is good idea to improve our presence in stackoverflow.com because: * any new user/developer mostly googles/searches stackoverflow for resolving issues before hitting us on the mailing list; This requires the user to subscribe to the mailing list -> unnecessary overhead, apart from the frustration that Google didn’t lead you to the correct answer! * if we can make it easier to resolve any issue, it can enhance user adoption experience In order to track Samza questions posted in stackoverflow.com, I wanted to create tags. Unfortunately, creating a tag requires a reputation of 1500 and my reputation is just 1 :) If any stackoverflow enthusiast here wants to earn some good karma, please help us in creating a tag - “apache-samza” in stack overflow. Thanks! Navina
RE: cannot be cast to java.lang.String
Looks like that was one error. I have set the property like this: systems.kafka.streams.syslog.samza.msg.serde=string But I am still getting the same error. Now I am seeing a different thing in the log previous to the exception: 23 mar 2015 05:49:31 INFO KafkaSystemProducer - Creating a new producer for system kafka. 23 mar 2015 05:49:31 INFO ProducerConfig - ProducerConfig values: value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 1 bootstrap.servers = [broker01:9092] metric.reporters = [] client.id = samza_producer-samzafroga_job1-1-1427086163149-3 compression.type = none retries = 2147483647 max.request.size = 1048576 send.buffer.bytes = 131072 acks = 1 reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in process loop. java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:80) at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) at samzafroga.job1.process(job1.java:21) at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) Thanks. Jordi -Mensaje original- De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] Enviado el: lunes, 23 de marzo de 2015 17:36 Para: dev@samza.apache.org Asunto: Re: cannot be cast to java.lang.String Have you tried setting this : systems.kafka.streams.syslog.samza.msg.serde=string // And assuming you've defined a 'string' serializer in your config OR systems.kafka.streams.syslog.samza.msg.serde=json // Depending on the corresponding format of your input data On Mon, Mar 23, 2015 at 9:24 AM, Jordi Blasi Uribarri wrote: > Hi, > > As I understand it, I am setting "kafka" as the system name, "beste" > as the output topic in the system and "syslog" as the input topic. > Both topics syslog and beste are working correctly as I am streaming > some syslogs to the "syslog" topic and I am testing "beste" with an > internal application specifically designed. I am not sure about the kafka > part. > > Thanks. > > Jordi > > -Mensaje original- > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > Enviado el: lunes, 23 de marzo de 2015 17:16 > Para: dev@samza.apache.org > Asunto: Re: cannot be cast to java.lang.String > > Hey Jordi, > > I see 3 different stream names. > > 1. new SystemStream("kafka", "beste"); > > 2. task.inputs=kafka.syslog > > 3. systems.kafka.streams.frogain.samza.msg.serde=json > > Just for a sanity check, can you double check you're setting the > config params for the correct stream ? > > > On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri > > wrote: > > > Hello, > > > > I have managed to get samza up and running an simple test job that > > just sends the received message. This is the code: > > > > public class job1 implements StreamTask { > > private final SystemStream OUTPUT_STREAM = new > > SystemStream("kafka", "beste"); > > > >public void process(IncomingMessageEnvelope envelope, > >MessageCollector collector, > >TaskCoordinator coordinator) > >{ > >
RE: cannot be cast to java.lang.String
Hi, As I understand it, I am setting "kafka" as the system name, "beste" as the output topic in the system and "syslog" as the input topic. Both topics syslog and beste are working correctly as I am streaming some syslogs to the "syslog" topic and I am testing "beste" with an internal application specifically designed. I am not sure about the kafka part. Thanks. Jordi -Mensaje original- De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] Enviado el: lunes, 23 de marzo de 2015 17:16 Para: dev@samza.apache.org Asunto: Re: cannot be cast to java.lang.String Hey Jordi, I see 3 different stream names. 1. new SystemStream("kafka", "beste"); 2. task.inputs=kafka.syslog 3. systems.kafka.streams.frogain.samza.msg.serde=json Just for a sanity check, can you double check you're setting the config params for the correct stream ? On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri wrote: > Hello, > > I have managed to get samza up and running an simple test job that > just sends the received message. This is the code: > > public class job1 implements StreamTask { > private final SystemStream OUTPUT_STREAM = new > SystemStream("kafka", "beste"); > >public void process(IncomingMessageEnvelope envelope, >MessageCollector collector, >TaskCoordinator coordinator) >{ > String msg = (String)envelope.getMessage(); > String outmsg = msg; > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > outmsg)); >} > } > > The properties file that runs it is this one: > > task.class=samzafroga.job1 > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > job.name=samzafroga.job1 > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory systems.kafka.consumer.zookeeper.connect= broker01:2181 > systems.kafka.producer.bootstrap.servers=broker01:9092 > > task.inputs=kafka.syslog > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory systems.kafka.streams.frogain.samza.msg.serde=json > > When I get a message in the line that contains the following command: > String msg = (String)envelope.getMessage(); > > I get an exception like this: > > 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in > process loop. > java.lang.ClassCastException: [B cannot be cast to java.lang.String > at samzafroga.job1.process(job1.java:19) > at > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) > at > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > at > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > at > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) > at > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > Is this a configuration issue? I tried changing the serializer to > String but I has the same effect. > > Thanks, > >Jordi > > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png] > -- Thanks and regards Chinmay Soman
cannot be cast to java.lang.String
Hello, I have managed to get samza up and running an simple test job that just sends the received message. This is the code: public class job1 implements StreamTask { private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "beste"); public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String msg = (String)envelope.getMessage(); String outmsg = msg; collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg)); } } The properties file that runs it is this one: task.class=samzafroga.job1 job.factory.class=org.apache.samza.job.local.ThreadJobFactory job.name=samzafroga.job1 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect= broker01:2181 systems.kafka.producer.bootstrap.servers=broker01:9092 task.inputs=kafka.syslog serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory systems.kafka.streams.frogain.samza.msg.serde=json When I get a message in the line that contains the following command: String msg = (String)envelope.getMessage(); I get an exception like this: 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in process loop. java.lang.ClassCastException: [B cannot be cast to java.lang.String at samzafroga.job1.process(job1.java:19) at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) Is this a configuration issue? I tried changing the serializer to String but I has the same effect. Thanks, Jordi ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
RE: SamzaException: no job factory class defined
That was it, and some more configurations that were missing: task.class=samzafroga.job1 job.factory.class=org.apache.samza.job.local.ThreadJobFactory job.name=samzafroga.job1 systems.kafka.producer.bootstrap.servers=broker01:9092 Now I am getting this exception: Exception in thread "main" java.lang.NoClassDefFoundError: org/eclipse/jetty/server/Handler at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:56) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:39) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.server.Handler at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) What I don’t understand is that the jetty jar is included in the job. Could it be a version problem? I am using jetty-util-6.1.26.jar. Thanks, Jordi -Mensaje original- De: Roger Hoover [mailto:roger.hoo...@gmail.com] Enviado el: miércoles, 18 de marzo de 2015 16:52 Para: dev@samza.apache.org Asunto: Re: SamzaException: no job factory class defined Hi Jordi, I think you need to add the "job.factory.class" property. http://samza.apache.org/learn/documentation/0.8/jobs/configuration-table.html #An example job.factory.class=org.apache.samza.job.local.ThreadJobFactory Cheers, Roger On Wed, Mar 18, 2015 at 8:45 AM, Jordi Blasi Uribarri wrote: > Hello, > > I am trying to run my first job (publish what receives) in Samza and I > think that all the dependencies where added by configuring the Maven > repositories (solved in a recent question to the list). I am getting > another exception on the Job runner: > > #/opt/jobs# bin/run-job.sh > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file://$PWD/job1.properties > java version "1.7.0_75" > OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK > 64-Bit Server VM (build 24.75-b04, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT.jar:/opt/jobs > /lib/samzafroga-0.0.1-SNAPSHOT-jar-with-dependencies.jar > org.apache.samza.job.JobRunner > --config-factory=org.apache.samza.config.factories.PropertiesConfigFac > tory --config-path=file:///opt/jobs/job1.properties > log4j: reset attribute= "false". > log4j: Threshold ="null". > log4j: Level value for root is [INFO]. > log4j: root level set to INFO > log4j: Class name: [org.apache.log4j.ConsoleAppender] > log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" > log4j: Setting property [conversionPattern] to [%d{dd MMM > HH:mm:ss} %5p %c{1} - %m%n]. > log4j: Adding appender named [consoleAppender] to category [root]. > log4j: Class name: [org.apache.log4j.RollingFileAppender] > log4j: Setting property [append] to [false]. > log4j: Setting property [file] to [out/learning.log]. > log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" > log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p > [%c{1}] %m%n]. > log4j: setFile called: out/learning.log, false > log4j: setFile ended > log4j: Adding appender named [fileAppender] to category [root]. > Exception in thread "main" org.apache.samza.SamzaException: no job > factory class defined > at org.apache.samza.job.JobRunner.run(JobRunner.scala:53) > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > > My properties file is this: > task.class=samzafroga.job1 > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory > > systems.kafka.consumer.zookeeper.connect=acio-broker01:2181,acio-broke > r02:2181 > task.inputs=kafka.frogain > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory systems.kafka.streams.frogain.samza.msg.serde=json > >
SamzaException: no job factory class defined
Hello, I am trying to run my first job (publish what receives) in Samza and I think that all the dependencies where added by configuring the Maven repositories (solved in a recent question to the list). I am getting another exception on the Job runner: #/opt/jobs# bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/job1.properties java version "1.7.0_75" OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode) /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Dlog4j.configuration=file:bin/log4j-console.xml -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT.jar:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.apache.samza.job.JobRunner --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///opt/jobs/job1.properties log4j: reset attribute= "false". log4j: Threshold ="null". log4j: Level value for root is [INFO]. log4j: root level set to INFO log4j: Class name: [org.apache.log4j.ConsoleAppender] log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" log4j: Setting property [conversionPattern] to [%d{dd MMM HH:mm:ss} %5p %c{1} - %m%n]. log4j: Adding appender named [consoleAppender] to category [root]. log4j: Class name: [org.apache.log4j.RollingFileAppender] log4j: Setting property [append] to [false]. log4j: Setting property [file] to [out/learning.log]. log4j: Parsing layout of class: "org.apache.log4j.PatternLayout" log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p [%c{1}] %m%n]. log4j: setFile called: out/learning.log, false log4j: setFile ended log4j: Adding appender named [fileAppender] to category [root]. Exception in thread "main" org.apache.samza.SamzaException: no job factory class defined at org.apache.samza.job.JobRunner.run(JobRunner.scala:53) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) My properties file is this: task.class=samzafroga.job1 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect=acio-broker01:2181,acio-broker02:2181 task.inputs=kafka.frogain serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory systems.kafka.streams.frogain.samza.msg.serde=json This is the code job code: package samzafroga; import org.apache.samza.config.Config; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; public class job1 implements StreamTask { private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "beste"); public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String msg = (String)envelope.getMessage(); String outmsg = msg; collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg)); } } I have been trying to read the code in the file JobRunner.scala, that apparently is the one generation the exception and as I understand is having a problem . I am not really sure if the problem is with the task.class definition or I still have something missing in the system. Thanks in advance, Jordi Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
RE: NoSuchMethodError
I hope that I am Reading the right documentation. In this page http://samza.apache.org/learn/documentation/latest/jobs/job-runner.html you can read: Samza jobs are started using a script called run-job.sh. samza-example/target/bin/run-job.sh \ --config-factory=samza.config.factories.PropertiesConfigFactory \ --config-path=file://$PWD/config/hello-world.properties The way you say it works. Now I have another different problem that I will have to check before asking. Thanks for your help. Cheers, Jordi -Mensaje original- De: Chris Riccomini [mailto:criccom...@apache.org] Enviado el: martes, 17 de marzo de 2015 16:43 Para: dev@samza.apache.org Asunto: Re: NoSuchMethodError Hey Jordi, PropertiesConfigFactory is located in this package: org.apache.samza.config.factories You have the package set to samza.config.factories. You'll need to set it to: org.apache.samza.config.factories.PropertiesConfigFactory Curious where you're getting that value from? We haven't had a "samza.*" prefix to packages since we open sourced Samza. What docs are you looking at? Cheers, Chris On Tue, Mar 17, 2015 at 5:45 AM, Jordi Blasi Uribarri wrote: > After doing what you told me, now I am including all the dependencies > in a package. What I am seeing now is another ClassNotFoundException > but in this case it does not seem that it is related to external > libraries but to Samza itself, as it is referencing the config factory. > > # bin/run-job.sh > --config-factory=samza.config.factories.PropertiesConfigFactory > --config-path=file://\$PWD/samzafroga.jar > java version "1.7.0_75" > OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK > 64-Bit Server VM (build 24.75-b04, mixed mode) > /usr/lib/jvm/java-7-openjdk-amd64/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M > -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp > /root/.samza/conf:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT.jar:/opt/job > s/lib/samzafroga-0.0.1-SNAPSHOT-jar-with-dependencies.jar > org.apache.samza.job.JobRunner > --config-factory=samza.config.factories.PropertiesConfigFactory > --config-path=file://$PWD/samzafroga.jar > Exception in thread "main" java.lang.ClassNotFoundException: > samza.config.factories.PropertiesConfigFactory > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:191) > at > org.apache.samza.util.CommandLine.loadConfig(CommandLine.scala:66) > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:36) > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > > Do I have to include anything more? > > Thanks, > > Jordi > > >Hey Jordi, > > > >The stack trace you've pasted suggests that you're missing Scala in > >the classpath, or have a different version of Scala in the classpath > >than what Samza was compiled with. > > > >You should not be manually assembling the dependencies for your job. > >Your build system should be doing this for you. Please see > >hello-samza's > pom.xml: > > > > https://github.com/apache/samza-hello-samza/blob/master/pom.xml > > > >For an example of how to do this. Specifically, the "assembly" plugin > >in Maven is used to build a .tgz file for your job, which has all of > >its required components: > > > > http://maven.apache.org/plugins/maven-assembly-plugin/ > > > >If you're not using Maven, Gradle and SBT can both assemble .tgz > >files as well. > > > >Cheers, > >Chris > > > >On Mon, Mar 16, 2015 at 4:11 AM, Jordi Blasi Uribarri > > > >wrote: > > > >> Hello, > >> > >> I am new to Samza and I am trying to test it. I have not found much > >> documentation and I am not sure if this is the correct place for > >> this > kind > >> of questions. Please let me know if I am in the wrong place. I have > tried > >> to follow the documentation but I guess I missed something or did &
Re: NoSuchMethodError
After doing what you told me, now I am including all the dependencies in a package. What I am seeing now is another ClassNotFoundException but in this case it does not seem that it is related to external libraries but to Samza itself, as it is referencing the config factory. # bin/run-job.sh --config-factory=samza.config.factories.PropertiesConfigFactory --config-path=file://\$PWD/samzafroga.jar java version "1.7.0_75" OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode) /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Dlog4j.configuration=file:bin/log4j-console.xml -Dsamza.log.dir=/opt/jobs -Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/opt/jobs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /root/.samza/conf:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT.jar:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.apache.samza.job.JobRunner --config-factory=samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/samzafroga.jar Exception in thread "main" java.lang.ClassNotFoundException: samza.config.factories.PropertiesConfigFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.samza.util.CommandLine.loadConfig(CommandLine.scala:66) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:36) at org.apache.samza.job.JobRunner.main(JobRunner.scala) Do I have to include anything more? Thanks, Jordi >Hey Jordi, > >The stack trace you've pasted suggests that you're missing Scala in the >classpath, or have a different version of Scala in the classpath than what >Samza was compiled with. > >You should not be manually assembling the dependencies for your job. Your >build system should be doing this for you. Please see hello-samza's pom.xml: > > https://github.com/apache/samza-hello-samza/blob/master/pom.xml > >For an example of how to do this. Specifically, the "assembly" plugin in >Maven is used to build a .tgz file for your job, which has all of its >required components: > > http://maven.apache.org/plugins/maven-assembly-plugin/ > >If you're not using Maven, Gradle and SBT can both assemble .tgz files as >well. > >Cheers, >Chris > >On Mon, Mar 16, 2015 at 4:11 AM, Jordi Blasi Uribarri >wrote: > >> Hello, >> >> I am new to Samza and I am trying to test it. I have not found much >> documentation and I am not sure if this is the correct place for this kind >> of questions. Please let me know if I am in the wrong place. I have tried >> to follow the documentation but I guess I missed something or did >> something wrong. >> >> I have installed a clean debian box and followed the instructions to >> download and build from git. >> >> git clone http://git-wip-us.apache.org/repos/asf/samza.git >> cd samza >> ./gradlew clean build >> >> I have also installed scala (2.9.2 ) and java 7 jdk an jre. >> >> I have created a simple job in java and I am trying to run it but I am >> seeing some java dependencies problems when I try to run both run-job.sh >> and run-am.sh scripts. >> >> What I have done is create a folder for the jobs in /opt/jobs. There I >> have created a bin folder for the scripts and a lib folder for all the jars >> that I find that are required (as I have seen in the script that this the >> place where they are obtained). I have copied there all the jar contained >> in the samza folders and the ones I have obtained from a hadoop-2.6.0 >> instalation package. Some of the dependencies have been solved but I am >> stuck in the following error when I run run-am.sh: >> >> Exception in thread "main" java.lang.NoSuchMethodError: >> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; >> at >> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$main$3.apply(SamzaAppMaster.scala:63) >> at >> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$main$3.apply(SamzaAppMaster.scala:63) >> at org.apache.samza.util.Logging$class.info(Logging.scala:55) >> at >> org.apache.samza.j
NoSuchMethodError
Hello, I am new to Samza and I am trying to test it. I have not found much documentation and I am not sure if this is the correct place for this kind of questions. Please let me know if I am in the wrong place. I have tried to follow the documentation but I guess I missed something or did something wrong. I have installed a clean debian box and followed the instructions to download and build from git. git clone http://git-wip-us.apache.org/repos/asf/samza.git cd samza ./gradlew clean build I have also installed scala (2.9.2 ) and java 7 jdk an jre. I have created a simple job in java and I am trying to run it but I am seeing some java dependencies problems when I try to run both run-job.sh and run-am.sh scripts. What I have done is create a folder for the jobs in /opt/jobs. There I have created a bin folder for the scripts and a lib folder for all the jars that I find that are required (as I have seen in the script that this the place where they are obtained). I have copied there all the jar contained in the samza folders and the ones I have obtained from a hadoop-2.6.0 instalation package. Some of the dependencies have been solved but I am stuck in the following error when I run run-am.sh: Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$main$3.apply(SamzaAppMaster.scala:63) at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$main$3.apply(SamzaAppMaster.scala:63) at org.apache.samza.util.Logging$class.info(Logging.scala:55) at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:55) at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:63) at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) What I am missing? As a more general question, I am having quite a work compiling the dependencies. Is there a reference of the jar files needed for the jobs and scripts to run correctly? thanks for your help, Jordi ________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]