Re: Review Request 36006: Writing a tool to read from the coordinator stream and react to config changes accordingly.

2015-08-03 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36006/#review94005
---



samza-autoscaling/src/main/java/org/apache/samza/autoScaling/utils/YarnUtil.java
 (line 68)


If application id is not found , it returns null. Please add this to the 
doc.



samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala (line 52)


Can you please add a one-line comment about the 
shouldRewriteConfigToCoordinatorStream variable? Someone without the context of 
ConfigManager may not understand the motivation behind it.


Overall, looks good. Please address Yi's comments as well and put up a patch.

- Navina Ramesh


On July 28, 2015, 2:39 a.m., Shadi A. Noghabi wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36006/
> ---
> 
> (Updated July 28, 2015, 2:39 a.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure), Navina Ramesh, and 
> Naveen Somasundaram.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> After a job is submitted, it might need some configuration change, 
> specifically it might need more containers. In SAMZA-704 a tool is being 
> added to write to the coordinator stream (CoordinatorStreamWriter).  This 
> tool can be used to write new configurations to the coordinator stream. 
> However, another tool (ConfigManager) is needed to read the config changes 
> and react to them, which is the goal of this task. This tool should be 
> brought up after the job is submitted and read any config changes added to 
> the coordinator stream, and react to each accordingly. 
> 
> This tool, called the Config Manager, is focusing on handling container 
> changs by reacting to set-config massages with key "yarn.container.count". 
> 
> The config manager is a separate standa alone module, that should be brought 
> up separately after the submission of a job. Therefore, you have to add two 
> configurations to the input config file:
> 1. yarn.rm.address= 
> 2. yarn.rm.port= 
> 
> The config manger will periodically poll the coordinator stream to see if 
> there are any new messages. This period is set to 100 ms by deafualt. 
> However, it can be configured by adding 
> configManager.polling.interval= to the input config file. 
> Thus, overal the command to run the config manager along with the job would 
> be:
> 
> 
> /bin/run-config-manager.sh --config-factory= factory> --config-path=
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
>   checkstyle/import-control.xml 6654319392929857bb861d77763afd8a5ea7674c 
>   gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoScaling/deployer/ConfigManager.java
>  PRE-CREATION 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoScaling/utils/YarnUtil.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  b1078bdf7bddd16c9ccc6559b9efd40ca5ae67bc 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 
>   samza-shell/src/main/bash/run-config-manager.sh PRE-CREATION 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  ea702a919348305ff95ce0b4ca1996a13aff04ec 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
>  ce88698c12c4bf6f4cf128f92d60b0b9496997d7 
>   settings.gradle 19bff971ad221084dac10d3f7f3facfa42b829a7 
> 
> Diff: https://reviews.apache.org/r/36006/diff/
> 
> 
> Testing
> ---
> 
> Tested with hello samza and works properly.
> 
> 
> Thanks,
> 
> Shadi A. Noghabi
> 
>



RE: testThreadInterruptInOperationSleep on clean installation

2015-08-03 Thread Jordi Blasi Uribarri
Hi,

There was no gradle process and I have tried to run it increasing the log 
level. It seems that the test that is failing is the one related to Rocksdb.

How can I run the test on eclipse?

This is what I get:

Executing task ':samza-kv-rocksdb_2.10:sourcesJar' (up-to-date check took 0.005 
secs) due to:
  Output file 
/opt/samza/samza-kv-rocksdb/build/libs/samza-kv-rocksdb_2.10-0.10.0-SNAPSHOT-sources.jar
 has changed.
  Output file 
/opt/samza/samza-kv-rocksdb/build/libs/samza-kv-rocksdb_2.10-0.10.0-SNAPSHOT-sources.jar
 has been removed.
:samza-kv-rocksdb_2.10:sourcesJar (Thread[Daemon,5,main]) completed. Took 0.019 
secs.
:samza-kv-rocksdb_2.10:signArchives (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:signArchives
Skipping task ':samza-kv-rocksdb_2.10:signArchives' as task onlyIf is false.
:samza-kv-rocksdb_2.10:signArchives SKIPPED
:samza-kv-rocksdb_2.10:signArchives (Thread[Daemon,5,main]) completed. Took 
0.003 secs.
:samza-kv-rocksdb_2.10:assemble (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:assemble
Skipping task ':samza-kv-rocksdb_2.10:assemble' as it has no actions.
:samza-kv-rocksdb_2.10:assemble (Thread[Daemon,5,main]) completed. Took 0.001 
secs.
:samza-kv-rocksdb_2.10:compileTestJava (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:compileTestJava
Skipping task ':samza-kv-rocksdb_2.10:compileTestJava' as it has no source 
files.
:samza-kv-rocksdb_2.10:compileTestJava UP-TO-DATE
:samza-kv-rocksdb_2.10:compileTestJava (Thread[Daemon,5,main]) completed. Took 
0.001 secs.
:samza-kv-rocksdb_2.10:compileTestScala (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:compileTestScala
Executing task ':samza-kv-rocksdb_2.10:compileTestScala' (up-to-date check took 
0.068 secs) due to:
  Output file /opt/samza/samza-kv-rocksdb/build/classes/test has changed.
  Output file 
/opt/samza/samza-kv-rocksdb/build/classes/test/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.class
 has been removed.
  Output file 
/opt/samza/samza-kv-rocksdb/build/classes/test/org/apache/samza/storage/kv/TestRocksDbKeyValueStore$$anonfun$testTTL$2.class
 has been removed.
Compiling with Ant scalac task.
[ant:scalac] Compiling 1 source file to 
/opt/samza/samza-kv-rocksdb/build/classes/test
[ant:scalac] Element '/opt/samza/samza-kv-rocksdb/build/resources/main' does 
not exist.
:samza-kv-rocksdb_2.10:compileTestScala (Thread[Daemon,5,main]) completed. Took 
1.158 secs.
:samza-kv-rocksdb_2.10:processTestResources (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:processTestResources
Skipping task ':samza-kv-rocksdb_2.10:processTestResources' as it has no source 
files.
:samza-kv-rocksdb_2.10:processTestResources UP-TO-DATE
:samza-kv-rocksdb_2.10:processTestResources (Thread[Daemon,5,main]) completed. 
Took 0.003 secs.
:samza-kv-rocksdb_2.10:testClasses (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:testClasses
Skipping task ':samza-kv-rocksdb_2.10:testClasses' as it has no actions.
:samza-kv-rocksdb_2.10:testClasses (Thread[Daemon,5,main]) completed. Took 
0.001 secs.
:samza-kv-rocksdb_2.10:test (Thread[Daemon,5,main]) started.
:samza-kv-rocksdb_2.10:test
Executing task ':samza-kv-rocksdb_2.10:test' (up-to-date check took 0.11 secs) 
due to:
  No history is available.
Starting process 'Gradle Test Executor 16'. Working directory: 
/opt/samza/samza-kv-rocksdb Command: /usr/lib/jvm/java-7-openjdk-amd64/bin/java 
-Djava.security.manager=jarjar.org.gradle.process.internal.child.BootstrapSecurityManager
 -Dfile.encoding=UTF-8 -ea -cp 
/root/.gradle/caches/2.0/workerMain/gradle-worker.jar 
jarjar.org.gradle.process.internal.launcher.GradleWorkerMain
Successfully started process 'Gradle Test Executor 16'
Gradle Test Executor 16 started executing tests.

org.apache.samza.storage.kv.TestRocksDbKeyValueStore > testTTL STANDARD_ERROR
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
/usr/lib/jvm/java-7-openjdk-amd64/bin/java: symbol lookup error: 
/tmp/librocksdbjni1871304830042440508..so: undefined symbol: clock_gettime
:samza-kv-rocksdb_2.10:test FAILED
:samza-kv-rocksdb_2.10:test (Thread[Daemon,5,main]) completed. Took 2.36 secs.

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':samza-kv-rocksdb_2.10:test'.
> Process 'Gradle Test Executor 16' finished with non-zero exit value 127

Thanks,

Jordi

-Mensaje original-
De: Yan Fang [mailto:yanfang...@gmail.com] 
Enviado el: martes, 04 de agosto de 2015 0:29
Para: dev@samza.apache.org
Asunto: Re: testThreadInterruptInOperationSleep on clean installation

Hi Jordi,

Those two exceptions seems like caused by the race condition. Since I can not 
reproduce it, can you try 1) kill all the GradleDaemon and GradleWrapperMain 
processes when you rerun the build ? 2) can you try to run those two tests in 
the eclipse (or some

Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-03 Thread Yan Fang


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 126
> > 
> >
> > add logs for the case where the topic is already existied. Log the 
> > metadata information. (like the original createStream code does)
> 
> Robert Zuljevic wrote:
> This is already done via KafkaSystemAdmin's createChangelogStream method. 
> Do you want me not to call this method there, but rather call it in 
> JobCoordinator, right after cration?

no, they are different. What I mean is to add "additional" log here.   
Something like 
_val changeLogMetadata = 
streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
info("Got change log stream metadata: %s" format changeLogMetadata)_. The 
goal is to make sure we create correct changelog in the AM.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: samza environment variable on containers

2015-08-03 Thread Yan Fang
Maybe @Eli Reisman can give you some insights, since he is writing the HDFS
producer. Because the exception looks like related to the hdfs consumer.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 12:41 PM, Chen Song  wrote:

> Thanks Yan.
>
> I ran into issues when testing jobs on kerberized cluster. The job reads
> from HDFS and it worked well before. After testing on kerberized cluster,
> the Samza container threw exception as below. I am not sure how kerberos
> has anything to do with this.
>
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> at
>
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> at
>
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:192)
> at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
> at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1916)
> at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1811)
> at
> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1760)
> at
>
> com.appnexus.data.samza.hdfs.HdfsProtobufSequenceFileReaderWriter.messages(HdfsProtobufSequenceFileReaderWriter.scala:20)
> at
>
> com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:84)
> at
>
> com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:76)
>
> After googling a lot, I stumbled upon this thread,
>
> http://stackoverflow.com/questions/22150417/hadoop-mapreduce-java-lang-unsatisfiedlinkerror-org-apache-hadoop-util-nativec
> .
>
> If anyone has any thoughts on this error, please advise.
>
> Chen
>
> On Thu, Jul 30, 2015 at 5:15 PM, Yan Fang  wrote:
>
> > Hi Chen Song,
> >
> > I do not think there is a way in Samza with which you can specify the ENV
> > for Samza container.
> >
> > And currently Samza does not read the LD_LIBRARY_PATH either.
> >
> > Samza only puts the files in lib/*.[jw]ar into the CLASSPATH.
> >
> > Though -Djava.library.path might work,  it will cause hadoop errors. :(
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Thu, Jul 30, 2015 at 7:05 AM, Chen Song 
> wrote:
> >
> > > Maybe a dumb question.
> > >
> > > Is there a way to set an ENV for samza containers?
> > >
> > > We want to set LD_LIBRARY_PATH to include hadoop native libs.
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>


Re: testThreadInterruptInOperationSleep on clean installation

2015-08-03 Thread Yan Fang
Hi Jordi,

Those two exceptions seems like caused by the race condition. Since I can
not reproduce it, can you try 1) kill all the GradleDaemon and
GradleWrapperMain processes when you rerun the build ? 2) can you try to
run those two tests in the eclipse (or some other ways) without gradle ? I
doubt both are related to the gradle.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 11:45 PM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I am trying to do a clean installation of Samza on a newly installed
> Debian 7.8 box. Following the stpes I collected in a previous 0.8.2 Samza
> installation I have performed the following steps:
>
> apt-get install openjdk-7-jdk openjdk-7-jre git maven curl
> vi /root/.bashrc
> export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
> export CLASSPATH=$CLASSPATH:/usr/share/java
>
> cd /opt
> git clone http://git-wip-us.apache.org/repos/asf/samza.git
> cd samza
> ./gradlew clean build
>
>
> Every time I run it I get an error on the test the script runs:
> testThreadInterruptInOperationSleep
> va.lang.AssertionError: expected:<1> but was:<0>
>at org.junit.Assert.fail(Assert.java:91)
>at org.junit.Assert.failNotEquals(Assert.java:645)
>at org.junit.Assert.assertEquals(Assert.java:126)
>at org.junit.Assert.assertEquals(Assert.java:470)
>at org.junit.Assert.assertEquals(Assert.java:454)
>at
> org.apache.samza.util.TestExponentialSleepStrategy.testThreadInterruptInOperationSleep(TestExponentialSleepStrategy.scala:158)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
>at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
>at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
>at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
>at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>at
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>at
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>at
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
>at
> org.gradle.internal.concurrent

Re: Keep samza container logs from being deleted

2015-08-03 Thread Navina Ramesh
Hi Chen,
You can set the yarn.nodemanage.delete.debug-delay-sec config in
yarn-site.xml, in order to retain the logs after the job crashes. Refer
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml


(This was intended for debug purpose).

If you need to retain log for longer period of time for purposes other than
debugging, you should look into yarn's log aggregation features.
Alternatively, you can use Samza's StreamAppender to produce the log data
to a stream. You can then publish it to ELK for further lookup. It really
depends on your requirements.
Please note that the StreamAppender has a bug in the master branch that is
pending resolution in https://issues.apache.org/jira/browse/SAMZA-723

Cheers!
Navina


On Mon, Aug 3, 2015 at 2:26 PM, Chen Song  wrote:

> Dump question.
>
> When running samza job on YARN, I found that container logs gone after
> days. Is there a YARN config to keep the logs from being deleted for
> long-lived streaming job?
>
> --
> Chen Song
>



-- 
Navina R.


Re: Review Request 37039: SAMZA-748 Coordinator URL always 127.0.0.1

2015-08-03 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37039/#review93971
---



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 312)


the result is not correct, it returns something like 
fe80:0:0:0:3e15:c2ff:feca:f0fc%4

also, need to test if the address is a isSiteLocalAddress. Sometimes, more 
than one address is site local adress.

Looked at Spark's code for the reference:


https://github.com/apache/spark/blob/6b2baec04fa3d928f0ee84af8c2723ac03a4648c/core/src/main/scala/org/apache/spark/util/Utils.scala#L835-L874

This code looks return desired result.

Thank you.


- Yan Fang


On Aug. 3, 2015, 1:44 p.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37039/
> ---
> 
> (Updated Aug. 3, 2015, 1:44 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> We are using InetAddress.getLocalHost().getHostAddress() for the 
> org.apache.samza.coordinator.server.HttpServer#getUrl. But getLocalHost() may 
> return a loopback address, 127.0.0.1, which is not reachable by other 
> machines.
> 
> Added a method to Util.scala which resolves the first network address which 
> is not a loopback address.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  e5ab4fb 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  dfe3a45 
>   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala de45123 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
>  dcf0435 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
>  cbf552c 
> 
> Diff: https://reviews.apache.org/r/37039/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> József Márton Jung
> 
>



Keep samza container logs from being deleted

2015-08-03 Thread Chen Song
Dump question.

When running samza job on YARN, I found that container logs gone after
days. Is there a YARN config to keep the logs from being deleted for
long-lived streaming job?

-- 
Chen Song


Re: Dinamically asigning output topic

2015-08-03 Thread Karthik Sriram
I believe you only need to declare the system stream you are writing to
before hand . ie your Kafka broker list if you are using Kafka. If auto
create topics is enabled, you can dynamically write to any topic within a
predefined system stream. I'd suggest simply use a hash map of topic to
OutputStream object and dynamically create new output stream objects as and
when you need a new topic or else use a pre existing one.

Input topics though are predefined but you could use a regex.
On Aug 3, 2015 4:32 AM, "Jordi Blasi Uribarri"  wrote:

> Hi,
>
> Is there a way to assign dynamically in the job code the topic where the
> output messages should be sent to? I am testing an idea that needs to
> decide programmatically (and not in from limited range) the destination
> topic. As I am understanding it, I need to declare the output format in the
> job properties file, so I need to know the topic name. Is there a way to
> free from this dependency?
>
> Thanks.
>
>Jordi
> 
> Jordi Blasi Uribarri
> Área I+D+i
>
> jbl...@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
>


Review Request 37039: SAMZA-748 Coordinator URL always 127.0.0.1

2015-08-03 Thread József Márton Jung

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37039/
---

Review request for samza.


Repository: samza


Description
---

We are using InetAddress.getLocalHost().getHostAddress() for the 
org.apache.samza.coordinator.server.HttpServer#getUrl. But getLocalHost() may 
return a loopback address, 127.0.0.1, which is not reachable by other machines.

Added a method to Util.scala which resolves the first network address which is 
not a loopback address.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
 e5ab4fb 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  
samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala 
dfe3a45 
  samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala de45123 
  
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 dcf0435 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
  
samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
 cbf552c 

Diff: https://reviews.apache.org/r/37039/diff/


Testing
---


Thanks,

József Márton Jung



Dinamically asigning output topic

2015-08-03 Thread Jordi Blasi Uribarri
Hi,

Is there a way to assign dynamically in the job code the topic where the output 
messages should be sent to? I am testing an idea that needs to decide 
programmatically (and not in from limited range) the destination topic. As I am 
understanding it, I need to declare the output format in the job properties 
file, so I need to know the topic name. Is there a way to free from this 
dependency?

Thanks.

   Jordi

Jordi Blasi Uribarri
Área I+D+i

jbl...@nextel.es
Oficina Bilbao

[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-03 Thread Robert Zuljevic


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 126
> > 
> >
> > add logs for the case where the topic is already existied. Log the 
> > metadata information. (like the original createStream code does)

This is already done via KafkaSystemAdmin's createChangelogStream method. Do 
you want me not to call this method there, but rather call it in 
JobCoordinator, right after cration?


- Robert


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>