Re: [VOTE] SEP-1: Semantics of ProcessorId in Samza

2017-03-29 Thread Yan Fang
+1 . Thanks for the proposal, Navina. :)

Fang, Yan
yanfang...@gmail.com

On Thu, Mar 30, 2017 at 4:24 AM, Prateek Maheshwari <
pmaheshw...@linkedin.com.invalid> wrote:

> +1 (non binding) from me.
>
> - Prateek
>
> On Tue, Mar 28, 2017 at 2:17 PM, Boris S  wrote:
>
> > +1 Looks good to me.
> >
> > On Tue, Mar 28, 2017 at 2:00 PM, xinyu liu 
> wrote:
> >
> > > +1 on my side. Very happy to see this proposal. This is a blocker for
> > > integrating fluent API with StreamProcessor, and hopefully we can get
> it
> > > resolved soon :).
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Tue, Mar 28, 2017 at 11:28 AM, Navina Ramesh (Apache) <
> > > nav...@apache.org>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > This is a voting thread for SEP-1: Semantics of ProcessorId in Samza.
> > > > For reference, here is the wiki link:
> > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > 1%3A+Semantics+of+ProcessorId+in+Samza
> > > >
> > > > Link to discussion mail thread:
> > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201703.
> > > > mbox/%3CCANazzuuHiO%3DvZQyFbTiYU-0Sfh3riK%3Dz4j_
> > > AdCicQ8rBO%3DXuYQ%40mail.
> > > > gmail.com%3E
> > > >
> > > > Please vote on this SEP asap. :)
> > > >
> > > > Thanks!
> > > > Navina
> > > >
> > >
> >
>


Re: [DISCUSS] Moving to github/pull-request for code review and check-in

2016-02-19 Thread Yan Fang
+1.

Though I am familiar with the current way, still think the pull requests
are simpler.

Cheers,

Fang, Yan
yanfang...@gmail.com

On Fri, Feb 19, 2016 at 11:10 AM, Milinda Pathirage 
wrote:

> +1. Calcite uses pull requests for contributions from non-committers and
> according to my experience with Calcite, pull requests are easier than the
> current approach we follow in Samza.
>
> Milinda
>
> On Thu, Feb 18, 2016 at 9:09 PM, Roger Hoover 
> wrote:
>
> > +1 - Thanks for bringing this up, Yi.  I've done it both ways and feel
> > pull requests are much easier.
> >
> > Sent from my iPhone
> >
> > > On Feb 18, 2016, at 4:25 PM, Navina Ramesh
> 
> > wrote:
> > >
> > > +1
> > >
> > > Haven't tried any contribution with pull requests. But sounds simpler
> > than
> > > attaching the patch to JIRA.
> > >
> > > Navina
> > >
> > >> On Thu, Feb 18, 2016 at 4:01 PM, Jacob Maes 
> > wrote:
> > >>
> > >> +1
> > >>
> > >> As a relatively new contributor to Samza, I've certainly felt the
> > current
> > >> process was overly-complicated.
> > >>
> > >>> On Thu, Feb 18, 2016 at 3:53 PM, Yi Pan  wrote:
> > >>>
> > >>> Hi, all,
> > >>>
> > >>> I want to start the discussion on our code review/commit process.
> > >>>
> > >>> I felt that our code review and check-in process is a little bit
> > >>> cumbersome:
> > >>> - developers need to create RBs and attach diff to JIRA
> > >>> - committers need to review RBs, dowload diff and apply, then push.
> > >>>
> > >>> It would be much lighter if we take the pull request only approach,
> as
> > >>> Kafka already converted to:
> > >>> - for the developers, the only thing needed is to open a pull
> request.
> > >>> - for committers, review and apply patch is from the same PR and
> merge
> > >> can
> > >>> be done directly on remote git repo.
> > >>>
> > >>> Of course, there might be some hookup scripts that we will need to
> link
> > >>> JIRA w/ pull request in github, which Kafka already does. Any
> comments
> > >> and
> > >>> feedbacks are welcome!
> > >>>
> > >>> Thanks!
> > >>>
> > >>> -Yi
> > >
> > >
> > >
> > > --
> > > Navina R.
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>


Re:Re: Samza processing reference data

2015-10-28 Thread Yan Fang


* Is there a tentative date for 0.10.0 release?
I think it's coming out soon. @Yi Pan , he should know more about that.


* I checked the checkpoint topic for Samza job and it seems the checkpoint 
topic is created with1 partition by default. Given that each Samza task will 
need to read from checkpoint topic, it is similar to what I need to read (Each 
Samza task is reading from the same partition of a topic). I am wondering how 
is that achieved?
In current implementation, only the AM reads the checkpoint stream and 
distribute the information to all the nodes using the http server. Not all the 
nodes are consuming the checkpoint stream. Correct me if I am wrong.


Thanks,
Yan






At 2015-10-28 02:49:23, "Chen Song"  wrote:
>Thanks Yan.
>
>* Is there a tentative date for 0.10.0 release?
>* I checked the checkpoint topic for Samza job and it seems the checkpoint
>topic is created with1 partition by default. Given that each Samza task
>will need to read from checkpoint topic, it is similar to what I need to
>read (Each Samza task is reading from the same partition of a topic). I am
>wondering how is that achieved?
>
>Chen
>
>On Sat, Oct 24, 2015 at 5:52 AM, Yan Fang  wrote:
>
>> Hi Chen Song,
>>
>>
>> Sorry for the late reply. What you describe is a typical bootstrap use
>> case. Check
>> http://samza.apache.org/learn/documentation/0.9/container/streams.html ,
>> the bootstrap configuration. By using this one, Samza will always read the
>> *topicR* from the beginning when it restarts. And then it treats the
>> *topicR* as a normal topic after reading existing msgs in the *topicD*.
>>
>>
>> == can we configure each individual Samza task to read data from all
>> partitions from a topic?
>> It works in the 0.10.0 by using the broadcast stream. In the 0.9.0, you
>> have to "create topicR with the same number of partitions as *topicD*, and
>> replicate data to all partitions".
>>
>>
>> Hope this still helps.
>>
>>
>> Thanks,
>> Yan
>>
>>
>> At 2015-10-22 04:44:41, "Chen Song"  wrote:
>> >In our samza app, we need to read data from MySQL (reference table) with a
>> >stream. So the requirements are
>> >
>> >* Read data into each Samza task before processing any message.
>> >* The Samza task should be able to listen to updates happening in MySQL.
>> >
>> >I did some research after scanning through some relevant conversations and
>> >JIRAs on the community but did not find a solution yet. Neither I find a
>> >recommended way to do this.
>> >
>> >If my data streams comes from a topic called *topicD*, options in my mind
>> >are:
>> >
>> >   - Use Kafka
>> >  1. Use one of CDC based solution to replicate data in MySQL to a
>> >  topic Kafka. https://github.com/wushujames/mysql-cdc-projects/wiki.
>> >  Say the topic is called *topicR*.
>> >  2. In my Samza app, read reference table from *topicR *and persisted
>> >  in a cache in each Samza task's local storage.
>> > - If the data in *topicR *is NOT partitioned in the same way as
>> > *topicD*, can we configure each individual Samza task to read
>> data
>> > from all partitions from a topic?
>> > - If the answer to the above question is no, do I need to
>> >create *topicR
>> > *with the same number of partitions as *topicD*, and replicate
>> > data to all partitions?
>> > - On start, how to make Samza task to block processing the first
>> > message from *topicD* before reading all data from *topicR*.
>> >  3. Any new updates/deletes to *topicR *will be consumed to update
>> the
>> >  local cache of each Samza task.
>> >  4. On failure or restarts, each Samza task will read from the
>> >  beginning from *topicR*.
>> >   - Not Use Kafka
>> >  - Each Samza task reads a Snapshot of database and builds its local
>> >  cache, and it then needs to read periodically to update its
>> >local cache. I
>> >  have read about a few blogs, and this doesn't sound a solid way
>> >in the long
>> >  term.
>> >
>> >Any thoughts?
>> >
>> >Chen
>> >
>> >   -
>> >
>> >--
>> >Chen Song
>>
>
>
>
>-- 
>Chen Song


Re:Re:Need help in log4j.xml externalization

2015-10-26 Thread Yan Fang
" When I keep my log4j.xml in //$PWD/deploy/alice/config/log4j.xml and do not 
set any "samza.container.name" then it is not generating any log files for me. "


== this is weird. The run-am/run-container script should automatically create 
the samba.container.name if it is not set...











At 2015-10-26 19:57:52, "Patni, Ankush"  wrote:
>Hi Yang,
>
>Thanks a lot for help. I have couple of doubts and want to clear them :
>
>I am starting  samza task from same machine where it is running.
>My problem is : I am not using StreamAppender as suggested by you in previous 
>reply. But still DailyRollingFileAppender uses the ${samza.container.name}.
>
>When I keep my log4j.xml in //$PWD/deploy/alice/config/log4j.xml  and do not 
>set any "samza.container.name"  then it is not generating any log files for me.
>
>But when I set something export 
>JAVA_OPTS="-Dsamza.container.name=samza-application-master-task1"  then it 
>generate the desired file in given location : =${samza.log.dir}
>
>
>
>Also when my log4j.xml is in my jar then everything run fine, log files are 
>getting generated within corresponding container. So is there any way I can 
>generate the log files in corresponding task folders using external log4j and 
>not giving in my jar? There are more than 7 task I am running from same jar 
>file.
>
>
>
>My log4j.xml looks like :
>
>
>
>
>http://jakarta.apache.org/log4j/";>
>/>
>
>class="org.apache.log4j.DailyRollingFileAppender">
>   value="${samza.log.dir}/${samza.container.name}.log" />
>  
>  
> 
>  
>   
>
>   
>  
>  
>  
>  
> 
>  
>   
>
>   
>  
>  
>  
>  
>   
>
>
>Regards,
>Ankush
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re:Re:Need help in log4j.xml externalization

2015-10-26 Thread Yan Fang
Hi Ankush,


In current situation, if you run the Yarn client and the Yarn application in 
different machines, the logs will not be mixed. Otherwise, there is no other 
ways, because they share the same environment variable, if the environment 
variable is set, they both will use it. You may want to open a JIRA for this 
request.


But IMHO, to get the logs for individual task, it is better to package them in 
different jar/tars. That maybe easier to manager -- if one task is 
down/changed, it does not affect other tasks. (assuming what you mean "task" is 
a "job", not the "task" in the context of Samza).


Thanks,
Yan


At 2015-10-26 21:00:35, "Patni, Ankush"  wrote:
>
>Hi Yan,
>
>Small doubt :
>
>Is it possible to set the log4j.xml in somewhere so that it will not get mix 
>with yarn client and will be available for yarn application only. With this I 
>will be able to get the logs for my individual task in their respective 
>container.
>
>Ankush
>
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re:Re: questions of partition and task of Samza

2015-10-26 Thread Yan Fang
Hi Selina,


Your understanding is correct. Yes, you "need to consumer the original input 
and send it back to Kafka and reset the* Key to departmentName *and then 
consume it again 
to count in Samza" if you want to count the number of students in the same 
departmentName. This is a typical aggregation use case. Because after 
aggregating the students in the same department, you can do more than just 
"count". :)


Cheers,
Yan


At 2015-10-25 06:12:50, "Selina Tech"  wrote:
>Hi, Yan:
>
>  Thanks a lot for your reply.
>
>  You mentioned "if you give the msgs the same partition key", which
>mean same partition key value or  same partition key attribute name?
>
>   I mentioned "primary key" as "key" at public
>KeyedMessage(java.lang.String topic, K key, V message) or you can ignore
>it. I explain it in another way below.
>
>   If I need aggregate data, but the data are not in same partition, do
>we need consumer the data, and put it back it to Kafka with with new key
>and then consumer it again and aggregate it in Samza.
>
>  For example,  messages about student GPA information was send to
>Kafka by* K key(String schoolName)*. The message looks like "name,
>schoolName,  departmentName,  grade, GPA", and assuming I have 3
>partitions, With my understanding, all student records in one school should
>go to same partition.
>
>  Right now I need to aggregate data for same department, no matter
>which school.  which mean all the same departmentName message will be in
>three different partition. If I just count it in one samza job, will the
>result correct?  Do I need to consumer the original input and send it back
>to Kafka and reset the* Key to  departmentName *and then consume it again
>to count in Samza?
>
>     If I did not understand the partition and task of Samza, would you
>like to correct me?
>
>Sincerely,
>Selina
>
>On Sat, Oct 24, 2015 at 2:45 AM, Yan Fang  wrote:
>
>>
>>
>> Hi Selina,
>>
>>
>> what do you mean by "primary key" here? Is it one of the partitions of
>> "input" or something like "if one msg meets condition x, we think msg has
>> the primary key"?
>>
>>
>> If you just want to count the msgs, you can count in one Samza job and
>> send the result to "output" topic. You can send to any partition of the
>> "output" if you give the msgs the same partition key.
>>
>>
>> Thanks,
>> Yan
>>
>>
>>
>>
>>
>>
>>
>> At 2015-10-22 08:30:15, "Selina Tech"  wrote:
>> >Hi, All:
>> >
>> >In the Samza document, it mentioned "Each task consumes data from
>> >one partition for each of the job’s input streams." Does it mean if the
>> >data processing one job is not in one partition, the result will be wrong.
>> >
>> >Assuming my Samza input data on Kafka topic -- "input" is
>> >partitioned by default -- round robin. And I have five partitions. If my
>> >Samza job is to count messages by primary key of the message at "input"
>> >topic, and then output it to kafka topic -- "output".
>> >
>> >   So I need steps as below
>> >  1. read data from Kafka topic "input"
>> >  2. reset the partition key to "primary key" in Samza
>> >  3. produce it back to Kafka topic named as "temp"
>> >  4. read "temp" topic at Samza
>> >  5. count it in Samza
>> >  6. write it to Kafka topic named as "output"
>> >
>> >  If I just read data from Kafka topic "input" and count it in Samza
>> >and write it to topic "output". The result will not be correct because
>> there
>> >might have multiple messages for same "primary key" in "output" topic.  Do
>> >I understand it correctly?
>> >
>> >Sincerely,
>> >Selina
>>


Re:Samza processing reference data

2015-10-24 Thread Yan Fang
Hi Chen Song,


Sorry for the late reply. What you describe is a typical bootstrap use case. 
Check http://samza.apache.org/learn/documentation/0.9/container/streams.html , 
the bootstrap configuration. By using this one, Samza will always read the 
*topicR* from the beginning when it restarts. And then it treats the *topicR* 
as a normal topic after reading existing msgs in the *topicD*.


== can we configure each individual Samza task to read data from all partitions 
from a topic?
It works in the 0.10.0 by using the broadcast stream. In the 0.9.0, you have to 
"create topicR with the same number of partitions as *topicD*, and replicate 
data to all partitions".


Hope this still helps.


Thanks,
Yan


At 2015-10-22 04:44:41, "Chen Song"  wrote:
>In our samza app, we need to read data from MySQL (reference table) with a
>stream. So the requirements are
>
>* Read data into each Samza task before processing any message.
>* The Samza task should be able to listen to updates happening in MySQL.
>
>I did some research after scanning through some relevant conversations and
>JIRAs on the community but did not find a solution yet. Neither I find a
>recommended way to do this.
>
>If my data streams comes from a topic called *topicD*, options in my mind
>are:
>
>   - Use Kafka
>  1. Use one of CDC based solution to replicate data in MySQL to a
>  topic Kafka. https://github.com/wushujames/mysql-cdc-projects/wiki.
>  Say the topic is called *topicR*.
>  2. In my Samza app, read reference table from *topicR *and persisted
>  in a cache in each Samza task's local storage.
> - If the data in *topicR *is NOT partitioned in the same way as
> *topicD*, can we configure each individual Samza task to read data
> from all partitions from a topic?
> - If the answer to the above question is no, do I need to
>create *topicR
> *with the same number of partitions as *topicD*, and replicate
> data to all partitions?
> - On start, how to make Samza task to block processing the first
> message from *topicD* before reading all data from *topicR*.
>  3. Any new updates/deletes to *topicR *will be consumed to update the
>  local cache of each Samza task.
>  4. On failure or restarts, each Samza task will read from the
>  beginning from *topicR*.
>   - Not Use Kafka
>  - Each Samza task reads a Snapshot of database and builds its local
>  cache, and it then needs to read periodically to update its
>local cache. I
>  have read about a few blogs, and this doesn't sound a solid way
>in the long
>  term.
>
>Any thoughts?
>
>Chen
>
>   -
>
>-- 
>Chen Song


Re:questions of partition and task of Samza

2015-10-24 Thread Yan Fang


Hi Selina,


what do you mean by "primary key" here? Is it one of the partitions of "input" 
or something like "if one msg meets condition x, we think msg has the primary 
key"?


If you just want to count the msgs, you can count in one Samza job and send the 
result to "output" topic. You can send to any partition of the "output" if you 
give the msgs the same partition key.


Thanks,
Yan







At 2015-10-22 08:30:15, "Selina Tech"  wrote:
>Hi, All:
>
>In the Samza document, it mentioned "Each task consumes data from
>one partition for each of the job’s input streams." Does it mean if the
>data processing one job is not in one partition, the result will be wrong.
>
>Assuming my Samza input data on Kafka topic -- "input" is
>partitioned by default -- round robin. And I have five partitions. If my
>Samza job is to count messages by primary key of the message at "input"
>topic, and then output it to kafka topic -- "output".
>
>   So I need steps as below
>  1. read data from Kafka topic "input"
>  2. reset the partition key to "primary key" in Samza
>  3. produce it back to Kafka topic named as "temp"
>  4. read "temp" topic at Samza
>  5. count it in Samza
>  6. write it to Kafka topic named as "output"
>
>  If I just read data from Kafka topic "input" and count it in Samza
>and write it to topic "output". The result will not be correct because there
>might have multiple messages for same "primary key" in "output" topic.  Do
>I understand it correctly?
>
>Sincerely,
>Selina


Re:Need help in log4j.xml externalization

2015-10-24 Thread Yan Fang


Hi Ankush,


1. why we need to give first of all 
-Dsamza.container.name=samza-application-master as it is already present in 
run-am.sh


I think there is a confusion here. When you set export 
JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml ", 
you only set it in the machine which you submit the job, not the machines that 
you run the job. 


To be more clear:


1. there are two parts of one Samza job: Yarn client and Yarn Application. Yarn 
client by default uses the log4j-console.xml, which can not use any 
StreamAppender. See here 
https://github.com/apache/samza/blob/master/samza-shell/src/main/bash/run-job.sh
 . Yarn Application uses the log4j.xml file that you want to externalize. See 
here 
https://github.com/apache/samza/blob/master/samza-shell/src/main/bash/run-am.sh 
.


2. If you use different machine to submit the Samza job and run the Samza job, 
the solution is simple: export 
JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml " in 
all the machines which will run the Samza job.


3. If you use one of the machines for running Samza to submit the Samza job, 
there maybe a bug. Because run-job.sh and run-am.sh scripts can not 
differenciate which log4j.xml to use. So when you submit the job, it will use 
the samza log4j.xml, which may have the StreamAppender. You can open a JIRA for 
this. It is an easy fix. If you do not use the StreamAppender, there should be 
no issue.


Hope this late reply still helps.


Thanks,
Yan







At 2015-10-21 21:38:34, "Patni, Ankush"  wrote:
>Hello Team,
>
>I am trying to externalize log4j from my task.
>
>So at present I run all the task from one tar.gz. And inside that I have 
>log4j.xml.
>
>But now I want to externalize the log4j.xml so that I can have more control 
>over logs. So before running my task I tried to set the JAVA_OPTS:
>
>
>export 
>JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml "
>
>
>
>And I get the following error when I run my task :
>
>
>
>Like : deploy/alice/bin/run-job.sh 
>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
>--config-path=file://$PWD/deploy/alice/config/EntryTask.properties
>
>
>
>log4j:ERROR Could not create an Appender. Reported error follows.
>
>org.apache.samza.SamzaException: Got null container name from system property: 
>samza.container.name. This is used as the key for the log appender, so can't 
>proceed.
>
>at 
> org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:89)
>
>at 
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
>
>at 
> org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
>
>at 
> org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176)
>
>
>
>Then I tried with following command :
>
>
>export 
>JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml 
>-Dsamza.container.name=samza-application-master"
>
>
>log4j:ERROR Could not create an Appender. Reported error follows.
>java.lang.NullPointerException
>at java.io.StringReader.(StringReader.java:50)
>at 
> org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:636)
>at 
> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
>at 
> org.apache.samza.logging.log4j.StreamAppender.getConfig(StreamAppender.java:180)
>at 
> org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:92)
>at 
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
>at 
> org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
>
>
>
>So my doubt is : why we need to give first of all 
>-Dsamza.container.name=samza-application-master as it is already present in 
>run-am.sh
>
>Second thing could anyone please help me in externilaztion of log4j.xml.
>
>  Regards,
>Ankush
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re: process killing

2015-09-22 Thread Yan Fang
Hi Jordi,

1. Are you running the job in one machine yarn? or in a cluster?

2. what kind of the java process do you see after killing the yarn
application? Because usually, when we run kill-yarn-job applicationId, we
do kill all the processes (this is actually done by the Yarn).

3. Which version of Samza and Yarn are you using ? This matters sometimes.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Sep 22, 2015 at 3:42 PM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I am currently developing solution using samza and in the development
> process I need to constantly change the code and test in the system. What I
> am seeing is that most of the times I kill a job using the kill-yarn-job
> script the job gets killed according to the web interface but I see the
> java process running. I also have seen that the job was actually been
> executed, as I got messages in the far end of the application. I have been
> manually killing these processes (kill -9 ) but I have some questions:
>
>
> -  Is there a reason for the processes not to be killed. It was
> not a matter of time as I could find them hours later.
>
> -  I don’t know if there should be any other action performed to
> completely clean the information or killing the process the hard way is
> enough.
>
> -  I am finding some memory consumption problems that I don’t know
> if they are related with this. Maybe I will describe them in another
> message.
>
> Thnaks,
>
>   Jordi
> 
> Jordi Blasi Uribarri
> Área I+D+i
>
> jbl...@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

2015-09-21 Thread Yan Fang


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line80>
> >
> > this getId is for the global container Id, right?
> 
> Navina Ramesh wrote:
> What do you mean global container Id? This will be the containerId 
> assigned by Yarn

yes, when I say "global container Id", I mean the id assigned by Yarn. You 
fixed it by changing another one to samzaContainerId :)


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  lines 96-99
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line96>
> >
> > 1. format the comment a little. :) using 
> > /*
> >  *
> >  */
> > 
> > 2. this comment does not explain the code following it. Maybe this 
> > comment should be part of the allocatedContainers variable javadoc.
> 
> Navina Ramesh wrote:
> Fixed the formatting. 
> The comment was intented to explain why we update the allocatedContainers 
> when only a containerRequest is made. So, I think it is better to leave it 
> here. I have re-worded it to explain better.

sure. Leaving it there sounds good.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 113
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line113>
> >
> > just curious if it is possible that, the hostname of the machine is 
> > different from the before-":"-part of the HttpAddress?
> 
> Navina Ramesh wrote:
> what do you mean by different? the format of HttpAddress is 
> "$hostname:$port". We are only interested in the hostname here.

ok. I see.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 116
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line116>
> >
> > can line 116 to 163 be refactored a little? Now it looks very confusing 
> > to me. (Too many nested if)
> > 
> > Is something like:
> > 
> > if (requestCountOnThisHost > 0 &&  (allocatedContainersOnThisHost == 
> > null || allocatedContainersOnThisHost.size() < requestCountOnThisHost))  
> > 
> > addToAllocatedContainerList(hostName, container);
> > 
> > } 
> > else {
> >   addToAllocatedContainerList(ANY_HOST, container);
> > }
> > 
> > sufficient for the logic?
> 
> Navina Ramesh wrote:
> Yeah. It should be sufficient. This is how I had it before. It became 
> hard to debug because you didn't why a container was allocated to a buffer. 
> That's why I simplified the logic. I think for now it adds value to have 
> detailed logging until we can make sure that the feature is stable. Do you 
> think it is ok?

yes, it sounds ok to me.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 280
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line280>
> >
> > this seems not safe to me.
> 
> Navina Ramesh wrote:
> why? how do you suggest I can change it?

how about Collections.unmodifiableMap() and Collections.unmodifiableList() ?


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, 
> > lines 79-80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line79>
> >
> > it is worth differenciating the containerId and container.getId
> 
> Navina Ramesh wrote:
> Ok. I will rename containerId to samzaContainerId.. Does that sound ok?

yes, definitely.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, 
> > lines 157-161
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line157>
> >
> > can this be simplified as 
> > if 
> > (state.runningContainers.containsValue(containerStatus.getContainerId()), 
> > then ...
> 
> Navina Ramesh wrote:
> Yeah. but we are interested in the key for that entry. If you use an 
> if-condition to check whether the value is present, how will you return the 
> key? You won't have any handl

Re: Asynchronous approach and samza

2015-09-20 Thread Yan Fang
Hi Michael,

Samza is designed for high-throughput and realtime processing. If you are
using HTTP request/external service, you may not retrieve the same
performance as not using it. However, technically speaking, there is
nothing blocking you to do this, (well, discouraged anyway :). Samza by
default does not provide this feature. So you maybe a little cautious when
implementing this.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Sep 20, 2015 at 4:28 PM, Michael Sklyar  wrote:

> Hi,
>
> What would be the best approach for doing "blocking" operations in Samza?
>
> For example, we have a kafka stream of urls for which we need to gather
> external data via HTTP (such as alexa rank, get the page title and
> headers..). Other scenarios include database access and decision making via
> a rule engine.
>
> Samza processes messages in a singe thread, HTTP requests might take
> hundreds of miliseconds. With the single threaded design the throughput
> would be very limited, which can be solved with an asynchronous approach.
> However Samza documentation explicitely states
> "*You are strongly discouraged from using threads in your job’s code*".
>
> It seems that Samza design suits very well "data transformation" scenarios,
> what is not clear is how well can it support external services?
>
> Thanks,
> Michael Sklyar
>


Re: Runtime Execution Model

2015-09-16 Thread Yan Fang
-- Hi Lukas,

I want to learn more from your production environment. How do you use
ProcessJobFactory
in Docker containers? Do you use one ProcessJobFactory process all the
tasks, or spawn out as many threads as the task number? How is the
fault-tolerance?


-- Hi Yi,

* Any progress in your side, in terms of the standalone job? (Chris' patch
is big, :)

*  Invert the JobCoordinator to the standalone Samza process s.t. the leader
process of the Samza job becomes the JobCoordinator
Currently, we run the JobCoordinator first, and then Yarn talks to
the JobCoordinator. Isn't it enough so far?

*  Make the partition assignment as pluggable model to distribute the tasks to
all Samza processes in a job-group in coordination.
   I think the reason for this is for the Kafka's new feature.The API
design needs to be compatible with Kafka.

*  Make Samza process multi-threaded while maintaining the per-task
single-threaded
programming model for the users
   Do we already have this, or need to add that? This I think can be
done in current ProcessJob. We can have the same number of threads as the
tasks.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan  wrote:

> Hi, all,
>
> Thanks for pitching in for the improvement plan. We have actually discussed
> this for a while now. In a complete view, I think that there are the
> following issues need to be addressed:
> 1) Currently, the steps involved to launch a Samza process are too complex
> and intertwined with YARN.
> 2) The Samza partition assignment is embedded within YARN AppMaster
> implementation, which makes it difficult to run the job outside YARN
> environment
>
> We have actually already started some work to address the above issues:
> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
> and has a proto-type patch available. This allows a ZK-based coordination
> to start standalone Samza processes w/o YARN
>
> There are also planned changes to allow de-coupling of Samza job
> coordination logic from YARN AppMaster:
> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us to
> keep the Samza-specific JobCoordinator logic independent from
> cluster-management systems.
>
> There is one more thing I am thinking: we may want to make the partition
> assignment logic as a pluggable module, such that we can choose different
> coordination mechanism in partition assignment as needed (e.g. ZK-based,
> cluster-management based, or Kafka-based coordination).
>
>
> Ultimately, I think that we should try to refactor the current job
> launching model to the following:
> 1) Make standalone Samza process the standard Samza process model
> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
> leader process of the Samza job becomes the JobCoordinator
> 3) Make the partition assignment as pluggable model to distribute the tasks
> to all Samza processes in a job-group in coordination
> 4) Make launching of Samza process agnostic of cluster-management systems.
> The cluster-management systems will simply provide the functionality of
> placing the standard Samza processes to actual available nodes
> 5) Make Samza process multi-threaded while maintaining the per-task
> single-threaded programming model for the users.
>
> Thoughts?
>
> -Yi
>
>
>
>
>
> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
> hannes.stock...@gmail.com>
> wrote:
>
> > +1
> >
> >
> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci 
> > wrote:
> >
> > > Hi,
> > >
> > > I support what Lukas saying. Samza packaging requirements are not
> > friendly,
> > > I use the ThreadJobFactory for the same reason.
> > >
> > > Bruno
> > >
> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys 
> > > wrote:
> > >
> > > > Hi Yan,
> > > >
> > > > We use Samza in a production environment using ProcessJobFactory in
> > > Docker
> > > > containers because it greatly simplifies our deployment process and
> > makes
> > > > much better use of resources.
> > > >
> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > > multithreaded? I will look into doing that myself, but I think it
> might
> > > be
> > > > useful to implement this for everyone. I am sure there are plenty of
> > > cases
> > > > where people do not want to use YARN, but want more parallelism in
> > their
> > > > tasks.
> > > >
> > > > Lukas
> > > >
> > > > -Original Message- From: Yan Fang
> > > &

Re: Review Request 38049: SAMZA-769 Replace deprecated method call and fix warnings

2015-09-15 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/38049/#review99189
---



samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 (line 157)
<https://reviews.apache.org/r/38049/#comment156100>

since this is from the http.close(), just try/catch the httpclient.close().

also, not use the printStackTrace(), use log.error(message, e)


- Yan Fang


On Sept. 2, 2015, 11:45 a.m., Aleksandar Bircakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/38049/
> ---
> 
> (Updated Sept. 2, 2015, 11:45 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Replaced deprecated method call and suppressed some warnings.
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java bc926c5 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
>  7089796 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
>  b2d37a7 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> c564964 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
>  4eaaec2 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
>  a18d8e0 
> 
> Diff: https://reviews.apache.org/r/38049/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Bircakovic
> 
>



Re: Review Request 38296: SAMZA-341: Support metrics report via Ganglia

2015-09-14 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/38296/#review98895
---



build.gradle (line 219)
<https://reviews.apache.org/r/38296/#comment155670>

if we only use it as isEmpty, do not think worth doing this.



docs/learn/documentation/versioned/container/metrics.md (line 25)
<https://reviews.apache.org/r/38296/#comment155586>

as mentioned in SAMZA-340's review, can we move this part to the end of 
this doc? Because Kafka's way is a little more recommeded. (no bias to Ganglia. 
:)



docs/learn/documentation/versioned/jobs/configuration-table.html (line 1387)
<https://reviews.apache.org/r/38296/#comment155652>

is this the default port for Ganglia or Graphite?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 41)
<https://reviews.apache.org/r/38296/#comment155655>

add java doc



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 44)
<https://reviews.apache.org/r/38296/#comment155656>

should be SamzaGangliaReporter



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 62)
<https://reviews.apache.org/r/38296/#comment155665>

1. if there is only one port name, should it be UNICAST?
Maybe it is better to use getModeForAddress to determin it.

2. why is the ttl "1"? do we make it configurable?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 66)
<https://reviews.apache.org/r/38296/#comment155667>

maybe throw the Samza Exception to kill the job if the Ganglia is not 
available?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 72)
<https://reviews.apache.org/r/38296/#comment155671>

wrap it with SamzaException



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 101)
<https://reviews.apache.org/r/38296/#comment155673>

change to error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 112)
<https://reviews.apache.org/r/38296/#comment155674>

same, error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 123)
<https://reviews.apache.org/r/38296/#comment155675>

error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 143)
<https://reviews.apache.org/r/38296/#comment155676>

error



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 154)
<https://reviews.apache.org/r/38296/#comment155677>

error



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 165)
<https://reviews.apache.org/r/38296/#comment155678>

error



samza-ganglia/src/test/java/org/apache/samza/metrics/reporter/GangliaCounterTest.java
 (line 31)
<https://reviews.apache.org/r/38296/#comment155699>

when the test are not run in the order shown in the class, the asserts will 
be incorrect.


- Yan Fang


On Sept. 11, 2015, 1:22 p.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/38296/
> ---
> 
> (Updated Sept. 11, 2015, 1:22 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added new moduo for Ganglia support.
> 
> Implemented come is based on 
> [SAMZA-340](https://issues.apache.org/jira/browse/SAMZA-340)
> 
> 
> Diffs
> -
> 
>   build.gradle 3a7fabc 
>   checkstyle/import-control.xml bc07ae8 
>   docs/learn/documentation/versioned/container/metrics.md 11a62f9 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d3 
>   gradle/dependency-versions.gradle 36d564b 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaCounter.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaGauge.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaReporterFactory.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaSnapshot.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaTimer.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.jav

Re: Runtime Execution Model

2015-09-14 Thread Yan Fang
Hi Bruno,

AFAIK, there is no existing JobFactory that brings as many threads as the
partition number. But I think nothing stops you to implement this: you can
get the partition information from the JobCoordinator, and then bring as
many threads as the partition/task number.

Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
mainly for development, there is no additional document. But most of the
code here

is
self-explained.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci 
wrote:

> Hi,
> I'm looking for additional documentation on the different RUNTIME
> EXECUTION MODELS of the different `job.factory.class`.
>
> I'm particularly interested on how each factory (ThreadJobFactory,
> ProcessJobFactory and YarnJobFactory) will create tasks consume and process
> messages out of Kafka and the thread model used.
>
> I did a few tests with the ThreadJob factory consuming out of a kafka
> topic with 5 partitions and I was expecting that it would use multiple
> threads to consume/process the different partitions, however it is
> using only one thread at runtime.
>
> Is there any way to tell Samza to use multiple processing threads (1 per
> partition)??
>
>
> Thanks
> Bruno
>


Re: memory limits

2015-09-11 Thread Yan Fang
Hi Jordi,

I believe you can change the memory by* yarn.container.memory.mb* , default
is 1024. And *yarn.am.container.memory.mb* is for the AM memory.

See
http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html

Thanks,
Fang, Yan
yanfang...@gmail.com

On Fri, Sep 11, 2015 at 4:21 AM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I am trying to implement an environment that requires multiple combined
> samza jobs for different tasks. I see that there is a limit to the number
> of jobs that can be running at the same time as they block 1GB of ram each.
> I understand that this is a reasonable limit in a production environment
> (as long as we are speaking of Big Data, we need big amounts of resources ☺
> ) but my lab does not have so much ram. Is there a way to reduce this limit
> so I can test it properly? I am using Samza 0.9.
>
> Thanks in advance,
>
>Jordi
> 
> Jordi Blasi Uribarri
> Área I+D+i
>
> jbl...@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

2015-09-10 Thread Yan Fang
aTaskManager.java (line 
186)
<https://reviews.apache.org/r/37817/#comment154981>

use the PREEMPTED instead of -100?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 
244)
<https://reviews.apache.org/r/37817/#comment154989>

why not just give the currentFailCount and lastFailureTime default values? 
Then we can get rid of the "else" part. looks simpler.



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 
245)
<https://reviews.apache.org/r/37817/#comment154986>

should be 1. because it is used in line 270



samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala (line 69)
<https://reviews.apache.org/r/37817/#comment154728>

it should be amJavaHome.isEmpty() , because we set "" as the default value 
in YarnConfig.


- Yan Fang


On Sept. 9, 2015, 12:28 a.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> ---
> 
> (Updated Sept. 9, 2015, 12:28 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi 
> Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
> https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This a major change to how we request and assign containers to resources. It 
> uses a threaded model for request and allocation. More comments in the 
> javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that 
> it delegates onContainerAllocated and requestContainers to the thread in 
> ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config 
> "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with 
> FairScheduler that has continuous scheduling enabled. Details on this config 
> can be found at SAMZA-617 
> [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
> 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
>  PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
>  PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
> ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 1f51551893c42bb13d7941c8b0e5594ac7f42585 
&

Re: not able to submit samza job to resource manager

2015-09-04 Thread Yan Fang
Hi Raja,

For the log, even though it does not present in the URL, you may still be
able to see the logs by using *yarn logs -applicationId * (or
go to the logs directory)

If you are using the 0.9.0 samza, I am afraid this version may not work
with the secured cluster. If you are using the master branch (compile it by
yourself), can you try to apply the patch
https://issues.apache.org/jira/browse/SAMZA-727, if it works, we can commit
this patch?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Sep 4, 2015 at 11:23 AM, Raja.Aravapalli  wrote:

> Hi Yan,
>
> I couldn't see application present in the ResourceManager / JHS for the
> application id. And as you can see in my below email, tracking url is also
> showing as N/A.
>
> Only one thing that is working is: "yarn application -status "
>
> 
>  Application Report :
>  Application-Id : 
>  Application-Name : samza-testing_1
>  Application-Type : Samza
>  User : 
>  Queue : default
>  Start-Time : 1441387039186
>  Finish-Time : 1441387039210
>  Progress : 0%
>  State : FAILED
>  Final-State : FAILED
>  Tracking-URL : N/A
>  RPC Port : -1
>  AM Host : N/A
>  Aggregate Resource Allocation : 0 MB-seconds, 0 vcore-seconds
>  Diagnostics : User:  is not
> allowed to impersonate 
>  
>
>
> Our cluster is secured with Kerberos. Any specific setting I should set
> when working in secure cluster ??
>
>
>
> Regards,
> Raja Mahesh Aravapalli.
>
> -Original Message-
> From: Yan Fang [mailto:yanfang...@gmail.com]
> Sent: Friday, September 04, 2015 11:48 PM
> To: dev@samza.apache.org
> Subject: Re: not able to submit samza job to resource manager
>
> Hi Raja,
>
> Can you check the yarn's log? This gives us more information to see what
> is the problem.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, Sep 4, 2015 at 10:23 AM, Raja.Aravapalli <
> raja.aravapa...@target.com
> > wrote:
>
> > Hi
> >
> > Please help me identify the problem:
> >
> > I am not able to submit the samza job to yarn RM. Found this when I
> > ran the "yarn application -status "
> >
> > 
> > Application Report :
> > Application-Id : 
> > Application-Name : samza-testing_1
> > Application-Type : Samza
> > User : 
> > Queue : default
> > Start-Time : 1441387039186
> > Finish-Time : 1441387039210
> > Progress : 0%
> > State : FAILED
> > Final-State : FAILED
> > Tracking-URL : N/A
> > RPC Port : -1
> > AM Host : N/A
> > Aggregate Resource Allocation : 0 MB-seconds, 0 vcore-seconds
> > Diagnostics : User:  is not
> > allowed to impersonate 
> > 
> >
> > Please help me identify, what's stopping me submitting in to cluster ?
> >
> > Thank you.
> >
> >
> > Regards,
> > Raja Mahesh Aravapalli.
> >
> >
>


Re: not able to submit samza job to resource manager

2015-09-04 Thread Yan Fang
Hi Raja,

Can you check the yarn's log? This gives us more information to see what is
the problem.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Sep 4, 2015 at 10:23 AM, Raja.Aravapalli  wrote:

> Hi
>
> Please help me identify the problem:
>
> I am not able to submit the samza job to yarn RM. Found this when I ran
> the "yarn application -status "
>
> 
> Application Report :
> Application-Id : 
> Application-Name : samza-testing_1
> Application-Type : Samza
> User : 
> Queue : default
> Start-Time : 1441387039186
> Finish-Time : 1441387039210
> Progress : 0%
> State : FAILED
> Final-State : FAILED
> Tracking-URL : N/A
> RPC Port : -1
> AM Host : N/A
> Aggregate Resource Allocation : 0 MB-seconds, 0 vcore-seconds
> Diagnostics : User:  is not
> allowed to impersonate 
> 
>
> Please help me identify, what's stopping me submitting in to cluster ?
>
> Thank you.
>
>
> Regards,
> Raja Mahesh Aravapalli.
>
>


Re: One task sending payload to multiple output streams

2015-09-03 Thread Yan Fang
Hi Elangovan,

I think not providing the partition key is the reason. Can you try to put
the partition key, such as 0,1,2,4 to see how it works? This uses the
default partition class. For better control, you may provide your own
partition class. This is to demonstrate the cause of your problem.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Sep 2, 2015 at 10:27 AM, Balusamy, Elangovan <
elangovan.balus...@altisource.com> wrote:

> We have one container for each partition, we are not providing any
> partition key.
>
> messageCollector.send(new OutgoingMessageEnvelope(output_stream_1, "this
> is for you"));
> messageCollector.send(new OutgoingMessageEnvelope(output_stream_2,  "this
> is for the other guy"));
>
> this was an example, the actual message payload is a HashMap serialized
> into bytes.
>
>   ByteArrayOutputStream b = new ByteArrayOutputStream();
>   ObjectOutputStream o = new ObjectOutputStream(b);
>   o.writeObject(map);
>   bytes = b.toByteArray();
>
> -Original Message-
> From: Yi Pan [mailto:nickpa...@gmail.com]
> Sent: Wednesday, September 02, 2015 10:31 PM
> To: dev@samza.apache.org
> Subject: Re: One task sending payload to multiple output streams
>
> Hi, Elangovan,
>
> Could you confirm how many containers in your job? And how is the outgoing
> messages partitioned on? Most likely, this is related to the choice on the
> outgoing message partition key, which is the only deciding factor for which
> partition of a topic the message is sent to.
>
> -Yi
>
> On Wed, Sep 2, 2015 at 12:58 AM, Balusamy, Elangovan <
> elangovan.balus...@altisource.com> wrote:
>
> > The task consumes from one stream with 2 partitions.
> >
> > -Original Message-
> > From: Garry Turkington [mailto:g.turking...@improvedigital.com]
> > Sent: Wednesday, September 02, 2015 1:12 PM
> > To: dev@samza.apache.org
> > Subject: RE: One task sending payload to multiple output streams
> >
> > Hi,
> >
> > How many input streams does this task consume and how are they
> partitioned?
> >
> > Garry
> >
> > -Original Message-
> > From: Balusamy, Elangovan [mailto:elangovan.balus...@altisource.com]
> > Sent: 02 September 2015 08:19
> > To: dev@samza.apache.org
> > Cc: Chandra, Saurabh
> > Subject: One task sending payload to multiple output streams
> >
> > Folks,
> >
> > We are running a multi-node Samza cluster with multiple partitions for
> > each task. In one of the tasks, we would like to send output to two
> > different tasks and the payload also is different. Below is the code
> > that does it
> >
> > messageCollector.send(new OutgoingMessageEnvelope(output_stream_1,
> > "this is for you"));
> >
> > messageCollector.send(new OutgoingMessageEnvelope(output_stream_2,
> > "this is for the other guy"));
> >
> > output_stream_1 has 4 partitions
> > output_stream_2 has 2 partitions
> >
> > We see that  only 50% of the  partitions are being used, the other 50%
> > doesn't get any messages.
> >
> > output_stream_1 has messages only in 2 partitions and output_stream_2
> > has messages only in 1 partition.
> >
> >
> >
> > Samza version: 0.9.0
> >
> >
> > Kafka Version:  0.8.2.1
> >
> >
> > Thanks
> > Elango
> >
> > **
> > *
> >
> > This email message and any attachments are intended solely for the use
> > of the addressee. If you are not the intended recipient, you are
> > prohibited from reading, disclosing, reproducing, distributing,
> > disseminating or otherwise using this transmission. If you have
> > received this message in error, please promptly notify the sender by
> > reply email and immediately delete this message from your system. This
> > message and any attachments may contain information that is
> > confidential, privileged or exempt from disclosure. Delivery of this
> > message to any person other than the intended recipient is not
> > intended to waive any right or privilege. Message transmission is not
> guaranteed to be secure or free of software viruses.
> >
> > **
> > *
> >
> > -
> > No virus found in this message.
> > Checked by AVG - www.avg.com
> > Version: 2014.0.4830 / Virus Database: 4365/10512 - Release Date:
> > 08/25/15 Internal Virus Database is out of date.
> >
> > **
> > *
> >
> > This email message and any attachments are intended solely for the use
> > of the addressee. If you are not the intended recipient, you are
> > prohibited from reading, disclosing, reproducing, distributing,
> > disseminating or otherwise using this transmission. If you have
> > received this message in error, please promptly notify the sender by
> > reply email and immediately delete this message from your system. This
> > message and any attachments may cont

Re: samza-hello-samza build cannot find samza 0.10.0-SNAPSHOT artifacts on maven

2015-08-28 Thread Yan Fang
run ./gradlew publishToMavenLocal ?

Fang, Yan
yanfang...@gmail.com

On Fri, Aug 28, 2015 at 10:06 AM, Yi Pan  wrote:

> Hi, all,
>
> I tried to build samza-hello-samza from latestt branch today and it failed
> due to the missing artifacts w/ 0.10.0-SNAPSHOT version on maven. The
> README.md file does not mention how to access the *-SNAPSHOT version of
> artifacts either. I am curious how the build for samza-hello-samza on
> latest branch work?
>
> @Yan, could you help to share your points here?
>
> Thanks!
>
> -Yi
>


Re: Why do I always receive waring email?

2015-08-26 Thread Yan Fang
Thanks. I filed a JIRA https://issues.apache.org/jira/browse/INFRA-10207 .
Waiting for any feedback.

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 26, 2015 at 3:33 AM, Dan  wrote:

> I also get this too now and again.
>
> I think it must be an apache e-mail issue, can we get someone from their
> infrastructure team to have a look?
>
>  - Dan
>
>
> On 26 August 2015 at 06:02, Yan Fang  wrote:
>
> > I actually do not know... I receive it frequently as well... But since I
> > still can get emails, ignoring it
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Tue, Aug 25, 2015 at 1:59 PM, Selina Tech 
> > wrote:
> >
> > > Dear all:
> > >
> > >  I received a few warning email as below. Does anyone know how
> > should I
> > > avoid those waring email?
> > >
> > > Sincerely,
> > > Selina
> > >
> > > - -- - - - - - -
> > > Hi! This is the ezmlm program. I'm managing the
> > > dev@samza.apache.org mailing list.
> > >
> > > I'm working for my owner, who can be reached
> > > at dev-ow...@samza.apache.org.
> > >
> > >
> > > Messages to you from the dev mailing list seem to
> > > have been bouncing. I've attached a copy of the first bounce
> > > message I received.
> > >
> > > If this message bounces too, I will send you a probe. If the probe
> > bounces,
> > > I will remove your address from the dev mailing list,
> > > without further notice . . . . . .
> > >
> > >  - - -- - - - - -
> > >
> >
>


Re: Why do I always receive waring email?

2015-08-25 Thread Yan Fang
I actually do not know... I receive it frequently as well... But since I
still can get emails, ignoring it

Fang, Yan
yanfang...@gmail.com

On Tue, Aug 25, 2015 at 1:59 PM, Selina Tech  wrote:

> Dear all:
>
>  I received a few warning email as below. Does anyone know how should I
> avoid those waring email?
>
> Sincerely,
> Selina
>
> - -- - - - - - -
> Hi! This is the ezmlm program. I'm managing the
> dev@samza.apache.org mailing list.
>
> I'm working for my owner, who can be reached
> at dev-ow...@samza.apache.org.
>
>
> Messages to you from the dev mailing list seem to
> have been bouncing. I've attached a copy of the first bounce
> message I received.
>
> If this message bounces too, I will send you a probe. If the probe bounces,
> I will remove your address from the dev mailing list,
> without further notice . . . . . .
>
>  - - -- - - - - -
>


Re: SAMZA build failing!!!

2015-08-24 Thread Yan Fang
Hi Raja,

Can you just do
1. clone https://github.com/apache/samza.git
2. cd samza
3. gradlew clean build

Do not need other steps actually, unless you have very specific reason.
This may not help, but before I dig into the problem, at least let us try
this.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 24, 2015 at 9:42 PM, Raja.Aravapalli  wrote:

> Hi Fang,
>
> I followed below steps:
>
> 1. I downloaded the code from https://github.com/apache/samza.git, cloned
> to desktop.
> 2. "cd" into code directory
> 3. Ran "gradle -b bootstrap.gradle"
> 4. then tried below two ways, it doesn't work in either
> a. gradlew -PscalaVersion=2.10 clean build
> b. gradlew clean build
>
> while running tests for samza-core that build is failing while execuitng
> samza-core:test task. And below is the error message what I am receiving...
>
> ==
> 1 warning
> :samza-autoscaling_2.10:javadoc
> :samza-autoscaling_2.10:javadocJar
> :samza-autoscaling_2.10:sourcesJar
> :samza-autoscaling_2.10:signArchives SKIPPED
> :samza-autoscaling_2.10:assemble
> :samza-autoscaling_2.10:checkstyleMain
> :samza-autoscaling_2.10:compileTestJava UP-TO-DATE
> :samza-autoscaling_2.10:compileTestScala UP-TO-DATE
> :samza-autoscaling_2.10:processTestResources UP-TO-DATE
> :samza-autoscaling_2.10:testClasses UP-TO-DATE
> :samza-autoscaling_2.10:checkstyleTest UP-TO-DATE
> :samza-autoscaling_2.10:test UP-TO-DATE
> :samza-autoscaling_2.10:check
> :samza-autoscaling_2.10:build
> :samza-core_2.10:javadocJar
> :samza-core_2.10:sourcesJar
> :samza-core_2.10:signArchives SKIPPED
> :samza-core_2.10:assemble
> :samza-core_2.10:checkstyleMain
> :samza-core_2.10:compileTestJava
> Note: C:\Users\z013sqm\Desktop\POCs\samza1 -
> Copy\samza-core\src\test\java\org\apache\samza\coordinator\stream\TestCoordinatorStreamWriter.java
> uses unchecked or unsafe operat
> ions.
> Note: Recompile with -Xlint:unchecked for details.
> :samza-core_2.10:compileTestScala
> [ant:scalac] Element samza1 - Copy\samza-core\build\resources\main' does
> not exist.
> :samza-core_2.10:processTestResources
> :samza-core_2.10:testClasses
> :samza-core_2.10:checkstyleTest
> :samza-core_2.10:test
>
> testCanReadPropertiesConfigFiles FAILED
> java.lang.IllegalArgumentException: Illegal character in authority at
> index 7: file://samza1 - Copy\samza-core/src/test/resources/test.proper
> ties
> at java.net.URI.create(URI.java:859)
> at
> org.apache.samza.config.factories.TestPropertiesConfigFactory.testCanReadPropertiesConfigFiles(TestPropertiesConfigFactory.scala:34)
>
> Caused by:
> java.net.URISyntaxException: Illegal character in authority at
> index 7: file://samza1 - Copy\samza-core/src/test/resources/test.propertie
> s
> at java.net.URI$Parser.fail(URI.java:2829)
> at java.net.URI$Parser.parseAuthority(URI.java:3167)
> at java.net.URI$Parser.parseHierarchical(URI.java:3078)
> at java.net.URI$Parser.parse(URI.java:3034)
> at java.net.URI.(URI.java:595)
> at java.net.URI.create(URI.java:857)
> ... 1 more
>
> testStorageEngineReceivedAllValues FAILED
> org.junit.ComparisonFailure:
> expected:<[/tmp/testing/state/testStore/]Partition_1> but
> was:<[\tmp\testing\state\testStore\]Partition_1>
> at org.junit.Assert.assertEquals(Assert.java:123)
> at org.junit.Assert.assertEquals(Assert.java:145)
> at
> org.apache.samza.storage.TestStorageRecovery.testStorageEngineReceivedAllValues(TestStorageRecovery.java:84)
>
> 150 tests completed, 3 failed, 1 skipped
> :samza-core_2.10:test FAILED
>
> FAILURE: Build failed with an exception.
>
> * What went wrong:
> Execution failed for task ':samza-core_2.10:test'.
> > There were failing tests. See the report at:
> file:///samza1%20-%20Copy/samza-core/build/reports/tests/index.html
>
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or
> --debug option to get more log output.
>
> BUILD FAILED
>
>
> ==
>
> Please guide me what I am doing wrong!! Also, please suggest me some
> document, where we can get build steps...
>
>
> Regards,
> Raja Mahesh Aravapalli.
>
> -Original Message-
> From: Yan Fang [mailto:yanfang...@gmail.com]
> Sent: Monday, August 24, 2015 8:10 PM
> To: dev@samza.apache.org
> Subject: Re: SAMZA build failing!!!
>
> Hi Raja,
>
> Do you only run samza-core or the whole samza project? I downloaded t

Re: Review Request 37604: SAMZA-760 Samza Container should catch Throwables instead of just catching Exceptions

2015-08-24 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37604/#review96145
---


Overall, LGTM. Could you also add a unit test to verify this? Thank you.


samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
581)
<https://reviews.apache.org/r/37604/#comment151430>

throwable, not exception in the log msg


- Yan Fang


On Aug. 19, 2015, 8:14 a.m., Aleksandar Bircakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37604/
> ---
> 
> (Updated Aug. 19, 2015, 8:14 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added a catch for Throwables in Samza container. Catching Throwables can 
> cause problems in specific situations so I also added a partial function 
> 'safely' that should take care of that specific situations.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 85b012b 
> 
> Diff: https://reviews.apache.org/r/37604/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Bircakovic
> 
>



Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-24 Thread Yan Fang
Hi Roger,

If you have plan to upgrade to 2.6.0, and no other companies are using
2.4.0, I think we can upgrade to 2.6.0 yarn in 0.10.0.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 20, 2015 at 4:48 PM, Yi Pan  wrote:

> Hi, Selina,
>
> Samza 0.9.1 on YARN 2.6 is the proved working solution.
>
> Best,
>
> -Yi
>
> On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech 
> wrote:
>
> > Hi, Yi:
> >  If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed?
> >
> > Sincerely,
> > Selina
> >
> > On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan  wrote:
> >
> > > Hi, Roger,
> > >
> > > In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
> > > now. I am not aware of any major issues in upgrading. I will let our
> team
> > > member Jon Bringhurst to chime in since he did all the upgrade and may
> > have
> > > more insights.
> > >
> > > @Jon, could you help to comment on this?
> > >
> > > Thanks!
> > >
> > > -Yi
> > >
> > > On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover 
> > > wrote:
> > >
> > > > We're using 2.4.0 in production.  Are there any major
> incompatibilities
> > > to
> > > > watch out for when upgrading to 2.6.0?
> > > >
> > > > Thanks,
> > > >
> > > > Roger
> > > >
> > > > On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang 
> > wrote:
> > > >
> > > > > Hi guys,
> > > > >
> > > > > we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
> > > > > <https://issues.apache.org/jira/browse/SAMZA-536>), because there
> > are
> > > > some
> > > > > bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
> > > > feature
> > > > > in Yarn 2.4.0 (SAMZA-750 <
> > > > https://issues.apache.org/jira/browse/SAMZA-750
> > > > > >)
> > > > > .
> > > > >
> > > > > So we just want to make sure if any production users are still
> using
> > > Yarn
> > > > > 2.4.0 and do not plan to upgrade to 2.6.0+?
> > > > >
> > > > > If not further concern, I think we can go and upgrade to Yarn 2.6.0
> > in
> > > > > Samza 0.10.0 release.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Fang, Yan
> > > > > yanfang...@gmail.com
> > > > >
> > > >
> > >
> >
>


Re: SAMZA build failing!!!

2015-08-24 Thread Yan Fang
Hi Raja,

Do you only run samza-core or the whole samza project? I downloaded the
samza from master branch and run ./gradlew clean build. There is no error.
Could you give a little more information how you get this error?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 24, 2015 at 9:54 AM, Raja.Aravapalli  wrote:

> Hi,
>
>
> I was n't able to build SAMZA to execute the Samza jobs.
>
>
> Receiving below exception while executing samza-core_2.10.
>
> I checkedout the "master" branch from https://github.com/apache/samza.git
> and trying to build!!
>
> 
>
> * What went wrong:
> Execution failed for task ':samza-core_2.10:test'.
>
>
> :samza-core_2.10:processTestResources
> :samza-core_2.10:testClasses
> :samza-core_2.10:checkstyleTest
> :samza-core_2.10:test
>
> testCanReadPropertiesConfigFiles FAILED
> java.lang.IllegalArgumentException: Illegal character in authority at
> index 7: file://samza1\samza-core/src/test/resources/test.properties
> at java.net.URI.create(URI.java:859)
> at
> org.apache.samza.config.factories.TestPropertiesConfigFactory.testCanReadPropertiesConfigFiles(TestPropertiesConfigFactory.scala:34)
>
> Caused by:
> java.net.URISyntaxException: Illegal character in authority at
> index 7: file://samza1\samza-core/src/test/resources/test.properties
> at java.net.URI$Parser.fail(URI.java:2829)
> at java.net.URI$Parser.parseAuthority(URI.java:3167)
> at java.net.URI$Parser.parseHierarchical(URI.java:3078)
> at java.net.URI$Parser.parse(URI.java:3034)
> at java.net.URI.(URI.java:595)
> at java.net.URI.create(URI.java:857)
> ... 1 more
>
>
> 
>
>
> Can someone please help me fix this. Thank you.
>
>
> Regards,
> Raja Mahesh Aravapalli.
>
>


Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-20 Thread Yan Fang
eviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 37536: SAMZA-710 Update WebServlet and RestServlet to read coordinatorStream information

2015-08-20 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37536/#review95965
---


can you also tested this patch? It does not start the LatestConfigManager 
anywhere, this may cause problem. Thank you.


samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 29)
<https://reviews.apache.org/r/37536/#comment151161>

How about renaming to LatestConfigManager? Then it is more specific and 
does not confuse with other config related classes.



samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 33)
<https://reviews.apache.org/r/37536/#comment151159>

the source should be "Job-coordinator"



samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 42)
<https://reviews.apache.org/r/37536/#comment151160>

copy/paste error . :)



samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
 (line 100)
<https://reviews.apache.org/r/37536/#comment151157>

if there is no other "manager" uses this method, maybe we can consider 
putting it into the LatestConfigManager?



samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala (line 
34)
<https://reviews.apache.org/r/37536/#comment151155>

wrong name.



samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala (line 
41)
<https://reviews.apache.org/r/37536/#comment151156>

can this be a little more concrete? The thing we want to test is that, 
configManager gets the current config, then we update the config, then we 
guarantee the configManager gets the "latest" config?


- Yan Fang


On Aug. 20, 2015, 9:19 a.m., Aleksandar Bircakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37536/
> ---
> 
> (Updated Aug. 20, 2015, 9:19 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> WebServlet and RestServlet now read information from coordinator stream 
> consumer and get new config.
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml aaa235a 
>   samza-core/src/main/java/org/apache/samza/container/ConfigManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  ca97ce8 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b59274 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> a926ce6 
>   samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  09f4dc3 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
>  7fd5122 
> 
> Diff: https://reviews.apache.org/r/37536/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Bircakovic
> 
>



Re: Review Request 37642: SAMZA-695 Update the StreamAppender doc

2015-08-20 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37642/#review95964
---



docs/learn/documentation/versioned/jobs/logging.md (line 100)
<https://reviews.apache.org/r/37642/#comment151153>

can it be something like "and change name of log stream with param 
'StreamName'". Because this is not the StreamAppender's name.



docs/learn/documentation/versioned/jobs/logging.md (line 104)
<https://reviews.apache.org/r/37642/#comment151154>

how about adding a comment  "" here?


- Yan Fang


On Aug. 20, 2015, 9:25 a.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37642/
> ---
> 
> (Updated Aug. 20, 2015, 9:25 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added requested param to logging.md
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/jobs/logging.md d1b372c 
> 
> Diff: https://reviews.apache.org/r/37642/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Pejakovic
> 
>



[DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-19 Thread Yan Fang
Hi Guozhang,

Thank you for writing the KIP-28 up. (Hope this is the right thread for me to 
post some comments. :) 

I still have some confusing about the implementation of the Processor:

1. why do we maintain a separate consumer and producer for each worker thread?
— from my understanding, the new consumer api will be able to fetch certain 
topic-partition. Is one consumer enough for one Kafka.process (it is shared 
among work threads)? The same thing for the producer, is one producer enough 
for sending out messages to the brokers? Will this have better performance?

2. how is the “Stream Synchronization” achieved?
— you talked about “pause” and “notify” the consumer. Still not very clear. 
If worker thread has group_1 {topicA-0, topicB-0} and group_2 {topicA-1, 
topicB-1}, and topicB is much slower. How can we pause the consumer to sync 
topicA and topicB if there is only one consumer?

3. how does the partition timestamp monotonically increase?
— “When the lowest timestamp corresponding record gets processed by the 
thread, the partition time possibly gets advanced.” How does the “gets 
advanced” work? Do we get another “lowest message timestamp value”? But doing 
this, may not get an “advanced” timestamp.

4. thoughts about the local state management.
— from the description, I think there is one kv store per partition-group. 
That means if one work thread is assigned more than one partition groups, it 
will have more than one kv-store connections. How can we avoid mis-operation? 
Because one partition group can easily write to another partition group’s kv 
store (they are in the same thread). 

5. do we plan to implement the throttle ?
— since we are “forwarding” the messages. It is very possible that, 
upstream-processor is much faster than the downstream-processor, how do we plan 
to deal with this?

6. how does the parallelism work?
— we achieve this by simply adding more threads? Or we plan to have the 
mechanism which can deploy different threads to different machines? It is easy 
to image that we can deploy different processors to different machines, then 
how about the work threads? Then how is the fault-tolerance? Maybe this is 
out-of-scope of the KIP?

Two nits in the KIP-28 doc:

1. miss the “close” method interfaceProcessor. We have the 
“override close()” in KafkaProcessor.

2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a 
punctuate method that accepts parameter.

Thanks,
Yan

Re: Review Request 37528: SAMZA-736 BrokerProxy will stuck in infinite loop if consumer.fetch throws OOME

2015-08-18 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37528/#review95817
---



samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
(lines 163 - 164)
<https://reviews.apache.org/r/37528/#comment150958>

will this stop the Container, or just log the error and finish this thread?

If we want to stop the Container, I think we need to throw a SamzaException 
here.



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
(line 46)
<https://reviews.apache.org/r/37528/#comment150960>

why is the order changed?



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
(line 366)
<https://reviews.apache.org/r/37528/#comment150959>

this test seems not testing anything. :)


- Yan Fang


On Aug. 18, 2015, 12:58 p.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37528/
> ---
> 
> (Updated Aug. 18, 2015, 12:58 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added new catch blocks to prevent infinite loops
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
>  376b277 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
> 614f33f 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
>  e285dec 
> 
> Diff: https://reviews.apache.org/r/37528/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Pejakovic
> 
>



Re: Use one producer for both coordinator stream and users system?

2015-08-18 Thread Yan Fang
Hi Tao,

First, one kafka producer has an i/o thread. (correct me if I am wrong).

Second, after Samza 0.10.0, we have a coordinator stream, which stores the
checkpoint, config and other locality information for auto-scaling, dynamic
configuration, etc purpose. (See Samza-348
<https://issues.apache.org/jira/browse/SAMZA-348>). So we have a producer
for this coordinator stream.

Therefore, each contains will have at least two producers, one is for the
coordinator stream, one is for the users system.

My question is, can we use only one producer for both coordinator stream
and the users system to have better performance? (from the doc, it may
retrieve better performance.)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 9:49 PM, Tao Feng  wrote:

> Hi Yan,
>
> Naive question: what do we need producer thread of coordinator stream for?
>
> Thanks,
> -Tao
>
> On Mon, Aug 17, 2015 at 2:09 PM, Yan Fang  wrote:
>
> > Hi guys,
> >
> > I have this question because Kafka's doc
> > <
> >
> http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> > >
> > seems recommending having one producer shared by all threads ("*The
> > producer is thread safe and should generally be shared among all threads
> > for best performance.*"), while currently the coordinator stream is
> using a
> > separate producer (usually, there are two producers(two producer threads)
> > in each container: one is for the coordinator stream , one is for the
> > "real" job)
> >
> > 1. Will having one producer shared by all thread really improve the
> > performance? (haven't done the perf test myself. Guess Kafka has some
> > proof).
> >
> > 2. if yes, should we go this way?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
>


Re: remote kafka producer -- kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries

2015-08-18 Thread Yan Fang
Thanks, Selina, for sharing this solution for the reference. :)

Cheers,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 7:17 PM, Job-Selina Wu 
wrote:

> Hi, All:
>
>   Finally I fixed this bug.
>
> 1. set advertised.host.name at config/server.properites as AWS
> *private IP* address
> (not public *DNS*)
> 2. comment host.name at config/server.properites
> 3. In remote java producer:
> props.put("metadata.broker.list",  borkerPrivateIp+ ":9092");
> The value brokerPrivateIp is same as advertised.host.name
>  at config/server.properites
>
>  This bug blocked me a while...
>
> Sincerely,
> Selina
>
> On Thu, Aug 13, 2015 at 9:56 PM, Job-Selina Wu 
> wrote:
>
> > Dear All:
> >
> >I got kafka.common.FailedToSendMessageException: Failed to send
> > messages after 3 tries as below. When I have a remote java Kafka producer
> > try to produce message to Kafka broker Server. Both Producer and Broker
> are
> > at AWS cloud. BTW, I tried my code first at local machine and Virtual
> > machine first, It did not work either.(advertised.host.name was set to
> > the ip address of the kafka server*)*
> >
> > *-**This is my KafkaProducer at remote Producer for producer
> > configuration**-*
> > public class KafkaProducer {
> >
> > Properties props = new Properties();
> >
> > private final Producer producer;
> > private final String kafkaServerIP = "52.19.2.74:9092";
> >
> > public KafkaProducer() {
> >
> >
> > props.put("metadata.broker.list", kafkaServerIP);
> > //props.put("bootstrap.servers", "localhost:9092 ");
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("advertised.host.name", "localhost");
> > props.put("request.required.acks", "0");
> >
> > ProducerConfig config = new ProducerConfig(props);
> >
> > producer = new Producer(config);
> > }
> >
> > public Producer getProducer() {
> >
> > return this.producer;
> > }
> > }
> >
> >
> > *The configs/server.properties at Kafka Server at AWS*-
> >
> > zookeeper.connect=localhost:2181
> > zookeeper.connection.timeout.ms=6000
> >
> > delete.topic.enable=true
> >
> > broker.id=0
> > port=9092
> > host.name=localhost
> > *advertised.host.name *=
> > ec2-51-18-21-235.us-west-1.compute.amazonaws.com
> >
> > # below is same as default
> > #advertised.port=
> > #advertised.port=
> > num.network.threads=3
> > num.io.threads=8
> > socket.send.buffer.bytes=102400
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > log.dirs=/tmp/kafka-logs
> > num.partitions=1
> > num.recovery.threads.per.data.dir=1
> > #log.flush.interval.messages=1
> > #log.flush.interval.ms=1000
> > log.retention.hours=168
> > #log.retention.bytes=1073741824
> > log.segment.bytes=1073741824
> > log.retention.check.interval.ms=30
> > log.cleaner.enable=false
> >
> >
> > - - --- - - - - --
> >
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > at kafka.producer.Producer.send(Producer.scala:77)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > at com.cinarra.kafka.Main.main(Main.java:21)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:606)
> > at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > reference:
> >
> >
> http://stackoverflow.com/questions/30217255/cant-connect-to-a-remote-kafka-producer-from-windows-through-java-code
> >
> > Your help is highly appreciated,
> > Selina
> >
> >
>


Re: KIP-28 kafka processor

2015-08-18 Thread Yan Fang
Thanks, Chris and Jay.

So do we add comments in this thread? Seems I can not leave comments in the
confluence. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 11:05 PM, Yi Pan  wrote:

> Hi, Chris and Jay,
>
> Thanks for the reminder. I plan to follow up this week.
>
> Cheers!
>
> -Yi
>
> On Sun, Aug 16, 2015 at 12:27 PM, Jay Kreps  wrote:
>
> > +1 Any feedback would be appreciated!
> >
> > -Jay
> >
> > On Sat, Aug 15, 2015 at 3:55 PM, Chris Riccomini 
> > wrote:
> >
> > > Hey all,
> > >
> > > I wanted to call attention to KIP-28:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client
> > >
> > > This is the result of the last conversation that we had about
> > > samza's future direction.
> > >
> > > It would be good to have the samza community involved in this.
> > >
> > > Cheers,
> > > Chris
> > >
> >
>


Re: Review Request 37536: SAMZA-710 Update WebServlet and RestServlet to read coordinatorStream information

2015-08-17 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37536/#review95672
---


I believe this patch works, but could you also add a unit test to verify the 
correctness? Also overusing the localityManager is a little concerned, could 
you think of another way? Thank you.


samza-core/src/main/java/org/apache/samza/job/model/JobModel.java (line 74)
<https://reviews.apache.org/r/37536/#comment150767>

this is a little overuse of the localityManager. It is not designed to get 
the latest config.



samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 (line 106)
<https://reviews.apache.org/r/37536/#comment150765>

sanitize it as well.


- Yan Fang


On Aug. 17, 2015, 2:56 p.m., Aleksandar Bircakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37536/
> ---
> 
> (Updated Aug. 17, 2015, 2:56 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> WebServlet and RestServlet now read information from coordinator stream 
> consumer and get new config.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  ca97ce8 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b59274 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  09f4dc3 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
>  7fd5122 
> 
> Diff: https://reviews.apache.org/r/37536/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Bircakovic
> 
>



[Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-17 Thread Yan Fang
Hi guys,

we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
), because there are some
bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature
in Yarn 2.4.0 (SAMZA-750 )
.

So we just want to make sure if any production users are still using Yarn
2.4.0 and do not plan to upgrade to 2.6.0+?

If not further concern, I think we can go and upgrade to Yarn 2.6.0 in
Samza 0.10.0 release.

Thanks,

Fang, Yan
yanfang...@gmail.com


Use one producer for both coordinator stream and users system?

2015-08-17 Thread Yan Fang
Hi guys,

I have this question because Kafka's doc

seems recommending having one producer shared by all threads ("*The
producer is thread safe and should generally be shared among all threads
for best performance.*"), while currently the coordinator stream is using a
separate producer (usually, there are two producers(two producer threads)
in each container: one is for the coordinator stream , one is for the
"real" job)

1. Will having one producer shared by all thread really improve the
performance? (haven't done the perf test myself. Guess Kafka has some
proof).

2. if yes, should we go this way?

Thanks,

Fang, Yan
yanfang...@gmail.com


Re: [SAMZA-423] Integrate Lucene into Samza

2015-08-16 Thread Yan Fang
Hi Robert,

Thank you for the contribution and very sorry for the late replay. :(

Left some comments in the JIRA in terms of the design doc. Hope that can
help you polish your design.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 13, 2015 at 12:29 AM, Robert Zuljevic 
wrote:

> Hello all,
>
>
>
> I’ve added a design document to task SAMZA-423. If anyone is interested
> please take a look : )
>
>
>
> I have some outstanding questions that I would like to resolve so I could
> continue working on this as soon as possible:
>
>
>
> 1.   Should the API support bulk indexing/removing/matching? (note
> that in some cases this might end up being only cosmetical)?
>
> 2.   Should the match API return the objects representing indexed
> elements or their wrappers (as suggested in the design document)?
>
> 3.   Should this task renamed to something like “Create Document
> Store” and create two other tasks dealing with concrete implementations in
> Luwak and Lucene (which is the endgoal).
>
>
>
> As stated in the ticket any and all suggestions/comments/criticisms are
> welcome : )
>
>
>
> Met vriendelijke groet / Kind regards,
>
> Robert Žuljević
>
> Software Developer
>
> [image: Title: Levi9 IT Services]
> --
>
> Address: Trifkovicev trg 6, 21000 Novi Sad, Serbia
>
> Tel.: +31 20 6701 947 | +381 21 2155 500
>
> Mobile: +381 64 428 28 46
>
> Skype: robert.zuljevic
>
> Internet: www.levi9.com
>
>
>
> Chamber of commerce Levi9 Holding: 34221951
>
> Chamber of commerce Levi9 IT Services BV: 34224746
> --
>
> This e-mail may contain confidential or privileged information. If you are
> not (one of) the intended recipient(s), please notify the sender
> immediately by reply e-mail and delete this message and any attachments
> permanently without retaining a copy. Any review, disclosure, copying,
> distribution or taking any action in reliance on the contents of this
> e-mail by persons or entities other than the intended recipient(s) is
> strictly prohibited and may be unlawful.
>
> The services of Levi9 are exclusively subject to its general terms and
> conditions. These general terms and conditions can be found on
> www.levi9.com and a copy will be promptly submitted to you on your
> request and free of charge.
>
>
>


Re: remote kafka producer -- kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries

2015-08-16 Thread Yan Fang
Hi Selina,

I guess you can post this question in the Kafka mailing list if this is
"pure" kafka, there are more experts in that community, though there are
some Kafka experts here as well. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 13, 2015 at 9:56 PM, Job-Selina Wu 
wrote:

> Dear All:
>
>I got kafka.common.FailedToSendMessageException: Failed to send
> messages after 3 tries as below. When I have a remote java Kafka producer
> try to produce message to Kafka broker Server. Both Producer and Broker are
> at AWS cloud. BTW, I tried my code first at local machine and Virtual
> machine first, It did not work either.(advertised.host.name was set to
> the ip address of the kafka server*)*
>
> *-**This is my KafkaProducer at remote Producer for producer
> configuration**-*
> public class KafkaProducer {
>
> Properties props = new Properties();
>
> private final Producer producer;
> private final String kafkaServerIP = "52.19.2.74:9092";
>
> public KafkaProducer() {
>
>
> props.put("metadata.broker.list", kafkaServerIP);
> //props.put("bootstrap.servers", "localhost:9092 ");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("advertised.host.name", "localhost");
> props.put("request.required.acks", "0");
>
> ProducerConfig config = new ProducerConfig(props);
>
> producer = new Producer(config);
> }
>
> public Producer getProducer() {
>
> return this.producer;
> }
> }
>
>
> *The configs/server.properties at Kafka Server at AWS*-
>
> zookeeper.connect=localhost:2181
> zookeeper.connection.timeout.ms=6000
>
> delete.topic.enable=true
>
> broker.id=0
> port=9092
> host.name=localhost
> *advertised.host.name *=
> ec2-51-18-21-235.us-west-1.compute.amazonaws.com
>
> # below is same as default
> #advertised.port=
> #advertised.port=
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/tmp/kafka-logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> #log.flush.interval.messages=1
> #log.flush.interval.ms=1000
> log.retention.hours=168
> #log.retention.bytes=1073741824
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> log.cleaner.enable=false
>
>
> - - --- - - - - --
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.cinarra.kafka.Main.main(Main.java:21)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> at java.lang.Thread.run(Thread.java:745)
>
>
> reference:
>
> http://stackoverflow.com/questions/30217255/cant-connect-to-a-remote-kafka-producer-from-windows-through-java-code
>
> Your help is highly appreciated,
> Selina
>
>


Re: Kill All Jobs

2015-08-13 Thread Yan Fang
Hi Jordi,

Thanks. This is useful. If possible, can you open a JIRA and upload the
patch there?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 6, 2015 at 8:04 AM, Shekar Tippur  wrote:

> Thanks Jordi. This really helps.
>
> - Shekar
>
> On Thu, Aug 6, 2015 at 12:21 AM, Jordi Blasi Uribarri 
> wrote:
>
> > Hi,
> >
> > As a little present (and I know this is not the way to get the code in
> the
> > project, but I am new to this sharing). I just made a simple script to
> kill
> > all the jobs running in Samza. It is supposed to live with
> kill-yarn-job.sh
> > in the bin folder. It shares me time, so maybe someone finds it helpful.
> >
> >
> > [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export
> > JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname
> > $0)/log4j-console.xml"
> >
> > exec $(dirname $0)/run-class.sh
> > org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep
> > application_ | awk -F ' ' '{ print $1 }' | while read linea
> > do
> >   $(dirname $0)/kill-yarn-job.sh $linea
> > done
> >
> > Hope it helps.
> >
> >Bye
> >
> > Jordi
> > 
> > Jordi Blasi Uribarri
> > Área I+D+i
> >
> > jbl...@nextel.es
> > Oficina Bilbao
> >
> > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
> >
>


Review Request 37428: SAMZA-723: hello-samza hangs when using StreamAppender

2015-08-12 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37428/
---

Review request for samza.


Bugs: SAMZA-723
https://issues.apache.org/jira/browse/SAMZA-723


Repository: samza


Description
---

fixed 2 parts of the problem:
1. Start streamAppender until the JobCoordinator is running
2. deadlock in Producer thread and the main thread

More explaination is in JIRA.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
a926ce6 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
209296d 
  samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
8948453 
  
samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 3e81240 

Diff: https://reviews.apache.org/r/37428/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 37102: SAMZA-753: BrokerProxy stop should shutdown kafka consumer first

2015-08-12 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37102/
---

(Updated Aug. 13, 2015, 1:12 a.m.)


Review request for samza.


Changes
---

added unit test for the closing.


Bugs: SAMZA-753
https://issues.apache.org/jira/browse/SAMZA-753


Repository: samza


Description
---

shutdown the kafka consumer before interrupting the BrokerProxy


Diffs (updated)
-

  samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
614f33f 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
e285dec 

Diff: https://reviews.apache.org/r/37102/diff/


Testing
---


Thanks,

Yan Fang



Re: Mailing list join request

2015-08-11 Thread Yan Fang
shouldn't you send to dev-subscr...@samza.apache.org ? :)

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 12, 2015 at 12:33 PM, Eli Reisman 
wrote:

> subscribe
>


Re: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Yan Fang
07)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> Is there any other info I can attach to help find the problem?
>
> Thanks,
>
>   Jordi
>
> -Mensaje original-
> De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 de
> agosto de 2015 23:21
> Para: dev@samza.apache.org
> Asunto: Re: Missing a change log offset for SystemStreamPartition
>
> Hi Jordi,
>
> Sorry for getting you back late. Was quite busy yesterday.
>
> I think the reason of your error is that you mismatched Samza version and
> Kafka version.
>
> Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not
> supported.
>
> So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
> This match is proved working.
>
> Hope this helps you.
>
> Thanks,
>
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri 
> wrote:
>
> > I changed the job name and the store name. I was defining two
> > different stores and in case that was the problem, I also eliminated the
> second one.
> > I am getting the same exception.
> >
> > Exception in thread "main" org.apache.samza.SamzaException: Missing a
> > change log offset for SystemStreamPartition [kafka, testdb-changelog, 2].
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> > at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> > at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > at
> >
> org.apache.samza.container.SamzaContainer$

Re: Missing a change log offset for SystemStreamPartition

2015-08-07 Thread Yan Fang
Hi Jordi,

Sorry for getting you back late. Was quite busy yesterday.

I think the reason of your error is that you mismatched Samza version and
Kafka version.

Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not
supported.

So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
This match is proved working.

Hope this helps you.

Thanks,


Fang, Yan
yanfang...@gmail.com

On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri 
wrote:

> I changed the job name and the store name. I was defining two different
> stores and in case that was the problem, I also eliminated the second one.
> I am getting the same exception.
>
> Exception in thread "main" org.apache.samza.SamzaException: Missing a
> change log offset for SystemStreamPartition [kafka, testdb-changelog, 2].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> As I have the autocreate configured in Kafka I am not creating anything
> for the store. Is that ok?
>
> By the way, is there any problem on having two different stores?
>
> Thanks,
>
> Jordi
>
> -Mensaje original-
> De: Yan Fang [mailto:yanfang...@gmail.com]
> Enviado el: miércoles, 05 de agosto de 2015 20:23
> Para: dev@samza.apache.org
> Asunto: Re: Missing a change log offset for SystemStreamPartition
>
> Hi Jordi,
>
> I wonder, the reason of your first exception is that, you changed the task
> number (partition number of your input stream), but still were using the
> same changelog stream. It is trying to send to the partition 2, which does
> not exist?
>
> Can you reproduce this exception in a new job? (new store name, new job
> name)
>
> The second exception is caused by the wrong offset format, I believe.
>
> Let me know how the new job goes.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri 
> wrote:
>
> > Hi,
> >
> > I am trying to use the Keystore to manage some state information.
> > Basically this is the code I am using. As long as I have tested, the
> > rest is working correctly.
> >
> > private KeyValueStore storestp;
> >
>

Re: Review Request 37039: SAMZA-748 Coordinator URL always 127.0.0.1

2015-08-06 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37039/#review94370
---



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 308)
<https://reviews.apache.org/r/37039/#comment148943>

use javadoc format, {@link}, not [[]]



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 310)
<https://reviews.apache.org/r/37039/#comment148944>

same. not use [[]]


- Yan Fang


On Aug. 4, 2015, 12:42 p.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37039/
> ---
> 
> (Updated Aug. 4, 2015, 12:42 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> We are using InetAddress.getLocalHost().getHostAddress() for the 
> org.apache.samza.coordinator.server.HttpServer#getUrl. But getLocalHost() may 
> return a loopback address, 127.0.0.1, which is not reachable by other 
> machines.
> 
> Added a method to Util.scala which resolves the first network address which 
> is not a loopback address.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
>  e3adc85 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  dfe3a45 
>   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala de45123 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
>  6b7f0ba 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ead6f94 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
>  cbf552c 
> 
> Diff: https://reviews.apache.org/r/37039/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> József Márton Jung
> 
>



Re: Missing a change log offset for SystemStreamPartition

2015-08-05 Thread Yan Fang
Hi Jordi,

I wonder, the reason of your first exception is that, you changed the task
number (partition number of your input stream), but still were using the
same changelog stream. It is trying to send to the partition 2, which does
not exist?

Can you reproduce this exception in a new job? (new store name, new job
name)

The second exception is caused by the wrong offset format, I believe.

Let me know how the new job goes.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I am trying to use the Keystore to manage some state information.
> Basically this is the code I am using. As long as I have tested, the rest
> is working correctly.
>
> private KeyValueStore storestp;
>
> public void init(Config config, TaskContext context) {
>  this.storestp = (KeyValueStore)
> context.getStore("stepdb");
>}
>
>public void process(IncomingMessageEnvelope envelope,
> MessageCollector collector,
> TaskCoordinator coordinator)
> {
>…
> String str = storestp.get(code)
> …
> }
>
> When I load it, it goes to running but, whe I send the messages through
> Kafka stream It goes to Failed state. I have found this Exception:
> Exception in thread "main" org.apache.samza.SamzaException: Missing a
> change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> I have seen that the stepdb-changelog stream exists in Kafka. As a try to
> regenerate the missing offset and tes it I have connected through the
> command line and send a message to the stream. It was received correctly.
> Now I am seeing the following Exception:
>
> Exception in thread "main" java.lang.NullPointerException
> at
> scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
> at
> scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
> at scala.collection.SeqLike$class.size(SeqLike.scala:106)
> at
> scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
> at scala.collection.Iterator$class.f

Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers

2015-08-05 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36903/#review94193
---



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (lines 47 - 49)
<https://reviews.apache.org/r/36903/#comment148730>

we need to remove the author information. :) And maybe add some java doc 
instead.

My 2 cents:
1. If this is a real test, to be consistent, we may want to use 
TestStreamTask (begin with Test), or change all other TestSomething to 
SomethingTest (e.g. change TestStateful to StatefulTest)

2. If this is not a real test, I prefer something like StreamTaskUtil to be 
less ambiguous.



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 96)
<https://reviews.apache.org/r/36903/#comment148740>

is this tag used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 148)
<https://reviews.apache.org/r/36903/#comment148741>

same, is this used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 169)
<https://reviews.apache.org/r/36903/#comment148742>

There is no "TestJob". (I know, it is copy/paste issue :)



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 176)
<https://reviews.apache.org/r/36903/#comment148752>

why TestStateStoreTask here? I think you mean TestTask.awaitTaskReistered



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 64)
<https://reviews.apache.org/r/36903/#comment148745>

From the description, it is not testing the Container Shutdown, actually it 
is testing the store restoring feature.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 66)
<https://reviews.apache.org/r/36903/#comment148734>

Since we already are doing the abstraction, is it possible to put the 
common config into StreamTastTest object? Becaue I see a lot of the same 
configs in ShutdownContainerTest and TestStatefulTask.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 87 - 89)
<https://reviews.apache.org/r/36903/#comment148755>

in the 0.10.0, we do not have checkpoint factory, I believe



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 142 - 146)
<https://reviews.apache.org/r/36903/#comment148754>

are those two methods used anywhere?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 155)
<https://reviews.apache.org/r/36903/#comment148758>

how about adding override ?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 165)
<https://reviews.apache.org/r/36903/#comment148759>

how about adding override?



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (lines 88 - 91)
<https://reviews.apache.org/r/36903/#comment148756>

same



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (line 95)
<https://reviews.apache.org/r/36903/#comment148757>

actually i do not understand why we need a companion object here. We just 
use the default task number, 1.

And awaitTaskRegistered and register methods are not used anywhere.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(lines 32 - 34)
<https://reviews.apache.org/r/36903/#comment148731>

Instead of the author information, I think putting some java doc explaining 
this class/object will be better.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(line 37)
<https://reviews.apache.org/r/36903/#comment148749>

rm ";"


- Yan Fang


On Aug. 4, 2015, 9:30 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36903/
> -------
> 
> (Updated Aug. 4, 2015, 9:30 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-744
> https://issues.apache.org/jira/browse/SAMZA-744
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-744: shutdown stores before shutdown producers
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517048ad5730762506426ee7578c66181db8 
>   
&

Re: What happens after changelog reaches the Kafka retention

2015-08-04 Thread Yan Fang
Aha, ok, that makes sense. Thanks, Yi.

Fang, Yan
yanfang...@gmail.com

On Tue, Aug 4, 2015 at 3:22 PM, Yi Pan  wrote:

> Hi, Yan,
>
> The changelog topic should be configured as log-compacted topic, which
> means that it will not be deleted due to time-retention.
>
> -Yi
>
> On Tue, Aug 4, 2015 at 3:15 PM, Yan Fang  wrote:
>
> > Hi guys,
> >
> > Have a question about the changelog topic. Currently we are restoring the
> > kv store by reading the whole changelog topic from the Kafka. So what
> will
> > happen after Kafka deletes some log segment after the retention time?
> Will
> > the changelog miss some values?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
>


Review Request 37102: SAMZA-753: BrokerProxy stop should shutdown kafka consumer first

2015-08-04 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37102/
---

Review request for samza.


Bugs: SAMZA-753
https://issues.apache.org/jira/browse/SAMZA-753


Repository: samza


Description
---

shutdown the kafka consumer before interrupting the BrokerProxy


Diffs
-

  samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
614f33f 

Diff: https://reviews.apache.org/r/37102/diff/


Testing
---


Thanks,

Yan Fang



What happens after changelog reaches the Kafka retention

2015-08-04 Thread Yan Fang
Hi guys,

Have a question about the changelog topic. Currently we are restoring the
kv store by reading the whole changelog topic from the Kafka. So what will
happen after Kafka deletes some log segment after the retention time? Will
the changelog miss some values?

Thanks,

Fang, Yan
yanfang...@gmail.com


Re: log4j configuration

2015-08-04 Thread Yan Fang
Hi Jordi,

This is a little tricky. :)

1. If you want to specify the logs to specific locations, please use the
following two properties, such as

*task.opts*=-Dsamza.log.dir=/tmp/samza-logs
*yarn.am.opts*=-Dsamza.log.dir=/tmp/samza-master-logs

2. then why doesn't export SAMZA_LOG_DIR work?

Because this setting only takes effect in your host machine. When you
submit the job, AM and containers do not inherit the environment variables
from their host.

3. then why can you still see the gc logs?

Though this variable does not affect AM and containers, it is used by the
YarnClient, which is used to submit the job. So essentially the gc log is
for the YarnClient (it dies after submitting). The log of YarnClient is
taken care by the* /bin/log4j-console.xml* file, not lib/log4j.xml (this
one is for the AM and all containers).

Hope this makes you a little clear.

Thanks,






Fang, Yan
yanfang...@gmail.com

On Tue, Aug 4, 2015 at 12:44 AM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I guess this is just a howto question, but I am not able to find how it
> works. I am trying to trace the code of the job I want to execute in Samza.
> I have defined the environment variable as stated in the documentation:
> export SAMZA_LOG_DIR=/opt/logs
>
> I believe that this is working as I have seen that the garbage collector
> correctly generates the gc.o file and writes to it. The directories exists
> in all samza servers (two in my lab).
>
> I have added the following code to the job:
>
> Logger logger = Logger.getLogger(GetConfig.class);
> logger.info("Trace text message");
>
> Although the code is being executed (I can see the messages going through)
> I see no trace written. I don’t have experience with log4j and maybe it is
> there where I have to look but, I am missing something?
>
> Thanks,
>
> Jordi
> 
> Jordi Blasi Uribarri
> Área I+D+i
>
> jbl...@nextel.es
> Oficina Bilbao
>
> [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-03 Thread Yan Fang


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 126
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line126>
> >
> > add logs for the case where the topic is already existied. Log the 
> > metadata information. (like the original createStream code does)
> 
> Robert Zuljevic wrote:
> This is already done via KafkaSystemAdmin's createChangelogStream method. 
> Do you want me not to call this method there, but rather call it in 
> JobCoordinator, right after cration?

no, they are different. What I mean is to add "additional" log here.   
Something like 
_val changeLogMetadata = 
streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
info("Got change log stream metadata: %s" format changeLogMetadata)_. The 
goal is to make sure we create correct changelog in the AM.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: samza environment variable on containers

2015-08-03 Thread Yan Fang
Maybe @Eli Reisman can give you some insights, since he is writing the HDFS
producer. Because the exception looks like related to the hdfs consumer.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 12:41 PM, Chen Song  wrote:

> Thanks Yan.
>
> I ran into issues when testing jobs on kerberized cluster. The job reads
> from HDFS and it worked well before. After testing on kerberized cluster,
> the Samza container threw exception as below. I am not sure how kerberos
> has anything to do with this.
>
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> at
>
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> at
>
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:192)
> at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
> at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1916)
> at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1811)
> at
> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1760)
> at
>
> com.appnexus.data.samza.hdfs.HdfsProtobufSequenceFileReaderWriter.messages(HdfsProtobufSequenceFileReaderWriter.scala:20)
> at
>
> com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:84)
> at
>
> com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:76)
>
> After googling a lot, I stumbled upon this thread,
>
> http://stackoverflow.com/questions/22150417/hadoop-mapreduce-java-lang-unsatisfiedlinkerror-org-apache-hadoop-util-nativec
> .
>
> If anyone has any thoughts on this error, please advise.
>
> Chen
>
> On Thu, Jul 30, 2015 at 5:15 PM, Yan Fang  wrote:
>
> > Hi Chen Song,
> >
> > I do not think there is a way in Samza with which you can specify the ENV
> > for Samza container.
> >
> > And currently Samza does not read the LD_LIBRARY_PATH either.
> >
> > Samza only puts the files in lib/*.[jw]ar into the CLASSPATH.
> >
> > Though -Djava.library.path might work,  it will cause hadoop errors. :(
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Thu, Jul 30, 2015 at 7:05 AM, Chen Song 
> wrote:
> >
> > > Maybe a dumb question.
> > >
> > > Is there a way to set an ENV for samza containers?
> > >
> > > We want to set LD_LIBRARY_PATH to include hadoop native libs.
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>


Re: testThreadInterruptInOperationSleep on clean installation

2015-08-03 Thread Yan Fang
Hi Jordi,

Those two exceptions seems like caused by the race condition. Since I can
not reproduce it, can you try 1) kill all the GradleDaemon and
GradleWrapperMain processes when you rerun the build ? 2) can you try to
run those two tests in the eclipse (or some other ways) without gradle ? I
doubt both are related to the gradle.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 11:45 PM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> I am trying to do a clean installation of Samza on a newly installed
> Debian 7.8 box. Following the stpes I collected in a previous 0.8.2 Samza
> installation I have performed the following steps:
>
> apt-get install openjdk-7-jdk openjdk-7-jre git maven curl
> vi /root/.bashrc
> export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
> export CLASSPATH=$CLASSPATH:/usr/share/java
>
> cd /opt
> git clone http://git-wip-us.apache.org/repos/asf/samza.git
> cd samza
> ./gradlew clean build
>
>
> Every time I run it I get an error on the test the script runs:
> testThreadInterruptInOperationSleep
> va.lang.AssertionError: expected:<1> but was:<0>
>at org.junit.Assert.fail(Assert.java:91)
>at org.junit.Assert.failNotEquals(Assert.java:645)
>at org.junit.Assert.assertEquals(Assert.java:126)
>at org.junit.Assert.assertEquals(Assert.java:470)
>at org.junit.Assert.assertEquals(Assert.java:454)
>at
> org.apache.samza.util.TestExponentialSleepStrategy.testThreadInterruptInOperationSleep(TestExponentialSleepStrategy.scala:158)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
>at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
>at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
>at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
>at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
>at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>at
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>at
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>at
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
>at
> org.gradle.internal.concurrent

Re: Review Request 37039: SAMZA-748 Coordinator URL always 127.0.0.1

2015-08-03 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37039/#review93971
---



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 312)
<https://reviews.apache.org/r/37039/#comment148445>

the result is not correct, it returns something like 
fe80:0:0:0:3e15:c2ff:feca:f0fc%4

also, need to test if the address is a isSiteLocalAddress. Sometimes, more 
than one address is site local adress.

Looked at Spark's code for the reference:


https://github.com/apache/spark/blob/6b2baec04fa3d928f0ee84af8c2723ac03a4648c/core/src/main/scala/org/apache/spark/util/Utils.scala#L835-L874

This code looks return desired result.

Thank you.


- Yan Fang


On Aug. 3, 2015, 1:44 p.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37039/
> ---
> 
> (Updated Aug. 3, 2015, 1:44 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> We are using InetAddress.getLocalHost().getHostAddress() for the 
> org.apache.samza.coordinator.server.HttpServer#getUrl. But getLocalHost() may 
> return a loopback address, 127.0.0.1, which is not reachable by other 
> machines.
> 
> Added a method to Util.scala which resolves the first network address which 
> is not a loopback address.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  e5ab4fb 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  dfe3a45 
>   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala de45123 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
>  dcf0435 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
>  cbf552c 
> 
> Diff: https://reviews.apache.org/r/37039/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> József Márton Jung
> 
>



Review Request 36973: SAMZA-626: tool to read the RocksDb in a running job

2015-07-30 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36973/
---

Review request for samza.


Bugs: SAMZA-626
https://issues.apache.org/jira/browse/SAMZA-626


Repository: samza


Description
---

*refactored some java code

*changed RocksDbKeyValueStore.options form scala to java

*moved default serde name from container to util, because it is useful to other 
classes

*added a class to read the running rocksdb

*added a commondline tool

*updated the doc accordingly


Diffs
-

  build.gradle 0852adc 
  docs/learn/documentation/versioned/container/state-management.md 50d4b65 
  samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
af7d4ca 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ead6f94 
  
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
 PRE-CREATION 
  
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueStoreHelper.java
 PRE-CREATION 
  
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
 PRE-CREATION 
  
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
 571a50e 
  
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 a423f7b 
  
samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueReader.java
 PRE-CREATION 
  samza-shell/src/main/bash/read-rocksdb-tool.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/36973/diff/


Testing
---


Thanks,

Yan Fang



Re: Coordinator URL always 127.0.0.1

2015-07-30 Thread Yan Fang
Created https://issues.apache.org/jira/browse/SAMZA-748

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 7:17 PM, Yi Pan  wrote:

> +1 on the fix in 0.10.0. It should be an easy one.
>
> On Thu, Jul 30, 2015 at 7:08 PM, Yan Fang  wrote:
>
> > Hi Thommy,
> >
> > {quote}
> > Because I don't see how this is ever going to work in scenarios where the
> > AM is on a different node than the containers.
> > {quote}
> >
> > -- I do not quite understand this part. AM essentially is running in a
> > container as well. And the http server is brought up in the same
> container.
> >
> > {quote}
> > even if we can't get a better address for the AM from YARN, we could at
> > least filter the addresses we get back from the JVM to exclude loopbacks.
> > {quote}
> >
> > -- You are right. InetAddress.getLocalHost() gives back loopback address
> > sometimes. We should filter this out. Just googling one possible solution
> > <http://www.coderanch.com/t/491883/java/java/IP> .
> >
> > + @Yi, @Navina,
> >
> > Also, I think this fix should go to the 0.10.0 release.
> >
> > What do you guys think?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Thu, Jul 30, 2015 at 6:39 PM, Yan Fang  wrote:
> >
> > > Just one point to add:
> > >
> > > {quote}
> > > AM gets notified of container status from the RM.
> > > {quote}
> > >
> > > I think this is not 100% correct. AM can communicate with NM through
> > > NMClientAsync
> > > <
> >
> https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/yarn/client/api/async/NMClientAsync.html
> >
> > to
> > > get container status, though Samza does not implement the
> > CallbackHandler.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Thu, Jul 30, 2015 at 6:06 PM, Navina Ramesh <
> > > nram...@linkedin.com.invalid> wrote:
> > >
> > >> The NM (and hence, by extension the container) heartbeats to the RM,
> not
> > >> the AM. AM gets notified of container status from the RM.
> > >> The AM starts / stops /releases a container process by communicating
> to
> > >> the
> > >> NM.
> > >>
> > >> Navina
> > >>
> > >>
> > >> On Thu, Jul 30, 2015 at 5:55 PM, Thomas Becker 
> > wrote:
> > >>
> > >> > Ok, I thought there was some communication from the container to the
> > AM,
> > >> > it sounds like you're saying it's in the other direction only?
> Don't
> > >> > containers heartbeat to the AM?  Regardless, even if we can't get a
> > >> better
> > >> > address for the AM from YARN, we could at least filter the addresses
> > we
> > >> get
> > >> > back from the JVM to exclude loopbacks.
> > >> >
> > >> > -Tommy
> > >> > 
> > >> > From: Navina Ramesh [nram...@linkedin.com.INVALID]
> > >> > Sent: Thursday, July 30, 2015 8:40 PM
> > >> > To: dev@samza.apache.org
> > >> > Subject: Re: Coordinator URL always 127.0.0.1
> > >> >
> > >> > Hi Tommy,
> > >> > Yi is right. Container start is coordinated by the AppMaster using
> an
> > >> > NMClient. Container host name and port is provided by the RM during
> > >> > allocation.
> > >> > In Yarn (at least, afaik), when the node joins a cluster, the NM
> > >> registers
> > >> > itself with the RM. So, the NM might still be using
> > >> > getLocalhost.getAddress().
> > >> >
> > >> > I don't know of any other way to programmatically fetch the
> machine's
> > >> > hostname (apart from some hacky shell commands).
> > >> >
> > >> > Cheers,
> > >> > Navina
> > >> >
> > >> > On Thu, Jul 30, 2015 at 5:23 PM, Yi Pan 
> wrote:
> > >> >
> > >> > > Hi, Tommy,
> > >> > >
> > >> > > Yeah, I agree that the current implementation is not bullet-proof
> to
> > >> any
> > >> > > different networking configuration on the host. As for the AM <->
> > >> > container
> > >> > > communi

Re: Coordinator URL always 127.0.0.1

2015-07-30 Thread Yan Fang
Hi Thommy,

{quote}
Because I don't see how this is ever going to work in scenarios where the
AM is on a different node than the containers.
{quote}

-- I do not quite understand this part. AM essentially is running in a
container as well. And the http server is brought up in the same container.

{quote}
even if we can't get a better address for the AM from YARN, we could at
least filter the addresses we get back from the JVM to exclude loopbacks.
{quote}

-- You are right. InetAddress.getLocalHost() gives back loopback address
sometimes. We should filter this out. Just googling one possible solution
<http://www.coderanch.com/t/491883/java/java/IP> .

+ @Yi, @Navina,

Also, I think this fix should go to the 0.10.0 release.

What do you guys think?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 6:39 PM, Yan Fang  wrote:

> Just one point to add:
>
> {quote}
> AM gets notified of container status from the RM.
> {quote}
>
> I think this is not 100% correct. AM can communicate with NM through
> NMClientAsync
> <https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/yarn/client/api/async/NMClientAsync.html>
>  to
> get container status, though Samza does not implement the CallbackHandler.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Jul 30, 2015 at 6:06 PM, Navina Ramesh <
> nram...@linkedin.com.invalid> wrote:
>
>> The NM (and hence, by extension the container) heartbeats to the RM, not
>> the AM. AM gets notified of container status from the RM.
>> The AM starts / stops /releases a container process by communicating to
>> the
>> NM.
>>
>> Navina
>>
>>
>> On Thu, Jul 30, 2015 at 5:55 PM, Thomas Becker  wrote:
>>
>> > Ok, I thought there was some communication from the container to the AM,
>> > it sounds like you're saying it's in the other direction only?  Don't
>> > containers heartbeat to the AM?  Regardless, even if we can't get a
>> better
>> > address for the AM from YARN, we could at least filter the addresses we
>> get
>> > back from the JVM to exclude loopbacks.
>> >
>> > -Tommy
>> > 
>> > From: Navina Ramesh [nram...@linkedin.com.INVALID]
>> > Sent: Thursday, July 30, 2015 8:40 PM
>> > To: dev@samza.apache.org
>> > Subject: Re: Coordinator URL always 127.0.0.1
>> >
>> > Hi Tommy,
>> > Yi is right. Container start is coordinated by the AppMaster using an
>> > NMClient. Container host name and port is provided by the RM during
>> > allocation.
>> > In Yarn (at least, afaik), when the node joins a cluster, the NM
>> registers
>> > itself with the RM. So, the NM might still be using
>> > getLocalhost.getAddress().
>> >
>> > I don't know of any other way to programmatically fetch the machine's
>> > hostname (apart from some hacky shell commands).
>> >
>> > Cheers,
>> > Navina
>> >
>> > On Thu, Jul 30, 2015 at 5:23 PM, Yi Pan  wrote:
>> >
>> > > Hi, Tommy,
>> > >
>> > > Yeah, I agree that the current implementation is not bullet-proof to
>> any
>> > > different networking configuration on the host. As for the AM <->
>> > container
>> > > communication, if I am not mistaken, it is through the NMClient and
>> the
>> > > node HTTP address is wrapped within the Container object returned from
>> > RM.
>> > > I am not very familiar with that part of source code. Navina may be
>> able
>> > to
>> > > help more here.
>> > >
>> > > -Yi
>> > >
>> > > On Thu, Jul 30, 2015 at 4:27 PM, Thomas Becker 
>> > wrote:
>> > >
>> > > > Hi Yi,
>> > > > Thanks a lot for your reply.  I don't doubt we can get it to work by
>> > > > mucking with the networking configuration, but to me this feels
>> like a
>> > > > workaround, not a solution.
>> > InetAddress.getLocalHost().getHostAddress()
>> > > is
>> > > > not a reliable way of obtaining an IP that other machines can
>> connect
>> > to.
>> > > > Just today I tested on several Linux distros and it did not work on
>> any
>> > > of
>> > > > them.  Can we do something more robust here?  How does the container
>> > > > communicate status to the AM?
>> > > >
>> > > > -Tommy
>> > > >
>> > > > __

Re: [DISCUSS] Release 0.10.0

2015-07-30 Thread Yan Fang
For SAMZA-747, may ping Naveen or Chris. :)

They have the permission to publish to maven. From the discuss
<https://github.com/facebook/rocksdb/issues/606>, they seem ready for the
release.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 5:35 PM, Navina Ramesh  wrote:

> Ok. I think it got confusing because you are talking about tickets NOT to
> be included in the release :)
>
> Got it, now. +1 for this list of exclusions!
> SAMZA-723 (StreamAppender bug) and SAMZA-747 (rocksdb) should be in 0.10.0.
> I think we don't have an ETA on the fix for SAMZA-747?
>
> Thanks!
> Navina
>
> On Thu, Jul 30, 2015 at 5:26 PM, Yi Pan  wrote:
>
> > Uh... wrong math all the day today... :(
> >
> > Let me re-try:
> >
> > 29/32 tickets are to be moved to later (i.e. excluded from 0.10.0).
> >
> > 2/32 tickets (including SAMZA-723) will be included
> >
> > 1/32 ticket (SAMZA-689) is up for discussion and I am leaning toward mark
> > it as won't fix.
> >
> > -Yi
> >
> > On Thu, Jul 30, 2015 at 5:24 PM, Yi Pan  wrote:
> >
> > > Hi, Navina,
> > >
> > > The 29/30 tickets are to be excluded from 0.10.0 (i.e. moved to
> 0.11.0),
> > > the three tickets are either to be included in 0.10.0, or won't fix.
> > >
> > > -Yi
> > >
> > > On Thu, Jul 30, 2015 at 5:10 PM, Navina Ramesh <
> > > nram...@linkedin.com.invalid> wrote:
> > >
> > >> Hi Yi,
> > >> Thanks for summarizing. But why are we excluding SAMZA-723 from the
> > >> current
> > >> release ? Doesn't this break the existing StreamAppender functionality
> > in
> > >> 0.9?
> > >>
> > >> Thanks!
> > >> Navina
> > >>
> > >> On Thu, Jul 30, 2015 at 4:55 PM, Yi Pan  wrote:
> > >>
> > >> > Sorry, hit the send button too fast. Let me correct the summary
> > section:
> > >> >
> > >> > 29/32 tickets Status=open/reopen :
> > >> >
> > >> >
> > >>
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(Open%2C%20Reopened)%20AND%20fixVersion%20%3D%200.10.0%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
> > >> >
> > >> > Three tickets excluded:
> > >> > - SAMZA-689, @Chinmay, were you able to re-produce it? I am thinking
> > of
> > >> > mark it as won't fix since 0.10.0 will remove the checkpoint topic
> > >> > altogether.
> > >> > - SAMZA-747, we will have to upgrade to RocksDB 3.11.1 to include
> the
> > >> fix
> > >> > that breaks Samza 0.10.0 build on Linux boxes
> > >> > - SAMZA-723, stream appender deadlock issue
> > >> >
> > >> > Thanks!
> > >> >
> > >> > -Yi
> > >> >
> > >> >
> > >> > On Thu, Jul 30, 2015 at 4:52 PM, Yi Pan 
> wrote:
> > >> >
> > >> > > Hi, all,
> > >> > >
> > >> > > Thanks a lot for helping out to select the features planned in
> > 0.10.0.
> > >> > >
> > >> > > Based on the above discussion, I am proposing to move the
> following
> > >> > > tickets later (i.e. 0.11.0).
> > >> > >
> > >> > > 30/32 tickets Status=open/reopen (exception for SAMZA-723: stream
> > >> > appender
> > >> > > deadlock issue) :
> > >> > >
> > >> >
> > >>
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(Open%2C%20Reopened)%20AND%20fixVersion%20%3D%200.10.0%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
> > >> > >
> > >> > > Two tickets excluded:
> > >> > > - SAMZA-689, @Chinmay, were you able to re-produce it? I am
> thinking
> > >> of
> > >> > > mark it as won't fix since 0.10.0 will remove the checkpoint topic
> > >> > > altogether.
> > >> > > - SAMZA-747, we will have to upgrade to RocksDB 3.11.1 to include
> > the
> > >> fix
> > >> > > that breaks Samza 0.10.0 build on Linux boxes
> > >> > >
> > >> > > Anything that I missed?
> > >> > >
> > >> > > Thanks!
> > >> > >
> > >> > > -Yi
> &

Re: Coordinator URL always 127.0.0.1

2015-07-30 Thread Yan Fang
Just one point to add:

{quote}
AM gets notified of container status from the RM.
{quote}

I think this is not 100% correct. AM can communicate with NM through
NMClientAsync

to
get container status, though Samza does not implement the CallbackHandler.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 6:06 PM, Navina Ramesh  wrote:

> The NM (and hence, by extension the container) heartbeats to the RM, not
> the AM. AM gets notified of container status from the RM.
> The AM starts / stops /releases a container process by communicating to the
> NM.
>
> Navina
>
>
> On Thu, Jul 30, 2015 at 5:55 PM, Thomas Becker  wrote:
>
> > Ok, I thought there was some communication from the container to the AM,
> > it sounds like you're saying it's in the other direction only?  Don't
> > containers heartbeat to the AM?  Regardless, even if we can't get a
> better
> > address for the AM from YARN, we could at least filter the addresses we
> get
> > back from the JVM to exclude loopbacks.
> >
> > -Tommy
> > 
> > From: Navina Ramesh [nram...@linkedin.com.INVALID]
> > Sent: Thursday, July 30, 2015 8:40 PM
> > To: dev@samza.apache.org
> > Subject: Re: Coordinator URL always 127.0.0.1
> >
> > Hi Tommy,
> > Yi is right. Container start is coordinated by the AppMaster using an
> > NMClient. Container host name and port is provided by the RM during
> > allocation.
> > In Yarn (at least, afaik), when the node joins a cluster, the NM
> registers
> > itself with the RM. So, the NM might still be using
> > getLocalhost.getAddress().
> >
> > I don't know of any other way to programmatically fetch the machine's
> > hostname (apart from some hacky shell commands).
> >
> > Cheers,
> > Navina
> >
> > On Thu, Jul 30, 2015 at 5:23 PM, Yi Pan  wrote:
> >
> > > Hi, Tommy,
> > >
> > > Yeah, I agree that the current implementation is not bullet-proof to
> any
> > > different networking configuration on the host. As for the AM <->
> > container
> > > communication, if I am not mistaken, it is through the NMClient and the
> > > node HTTP address is wrapped within the Container object returned from
> > RM.
> > > I am not very familiar with that part of source code. Navina may be
> able
> > to
> > > help more here.
> > >
> > > -Yi
> > >
> > > On Thu, Jul 30, 2015 at 4:27 PM, Thomas Becker 
> > wrote:
> > >
> > > > Hi Yi,
> > > > Thanks a lot for your reply.  I don't doubt we can get it to work by
> > > > mucking with the networking configuration, but to me this feels like
> a
> > > > workaround, not a solution.
> > InetAddress.getLocalHost().getHostAddress()
> > > is
> > > > not a reliable way of obtaining an IP that other machines can connect
> > to.
> > > > Just today I tested on several Linux distros and it did not work on
> any
> > > of
> > > > them.  Can we do something more robust here?  How does the container
> > > > communicate status to the AM?
> > > >
> > > > -Tommy
> > > >
> > > > 
> > > > From: Yi Pan [nickpa...@gmail.com]
> > > > Sent: Thursday, July 30, 2015 6:48 PM
> > > > To: dev@samza.apache.org
> > > > Subject: Re: Coordinator URL always 127.0.0.1
> > > >
> > > > Hi, Tommy,
> > > >
> > > > I think that it might be a commonly asked question regarding to
> > multiple
> > > > IPs on a single host. A common trick w/o changing code is (copied
> from
> > > SO:
> > > >
> > > >
> > >
> >
> http://stackoverflow.com/questions/2381316/java-inetaddress-getlocalhost-returns-127-0-0-1-how-to-get-real-ip
> > > > )
> > > >
> > > > {code}
> > > >
> > > >1.
> > > >
> > > >Find your host name. Type: hostname. For example, you find your
> > > hostname
> > > >is mycomputer.xzy.com
> > > >2.
> > > >
> > > >Put your host name in your hosts file. /etc/hosts . Such as
> > > >
> > > >10.50.16.136 mycomputer.xzy.com
> > > >
> > > >
> > > > {code}
> > > >
> > > > -Yi
> > > >
> > > > On Thu, Jul 30, 2015 at 11:35 AM, Tommy Becker 
> > > wrote:
> > > >
> > > > > We are testing some jobs on a YARN grid and noticed they are often
> > not
> > > > > starting up properly due to being unable to connect to the job
> > > > coordinator.
> > > > > After some investigation it seems as if the jobs are always
> getting a
> > > > > coordinator URL of http://127.0.0.1:  But my understanding
> is
> > > that
> > > > > the coordinator runs only in the AM, so I'd expect these URLs to
> more
> > > > often
> > > > > than not be to some other machine.  Looking at the code however,
> I'm
> > > not
> > > > > sure how that would ever happen since the URL for the coordinator
> > > always
> > > > > comes from InetAddress.getLocalHost().getHostAddress() in
> > > > > org.apache.samza.coordinator.server.HttpServer#getUrl
> > > > >
> > > > > Am I off base here?  Because I don't see how this is ever going to
> > work
> > > > in
> > > > > scenarios where the AM is on a differ

Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang


> On July 30, 2015, 6:59 p.m., Yan Fang wrote:
> > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala,
> >  line 40
> > <https://reviews.apache.org/r/35445/diff/4/?file=1023371#file1023371line40>
> >
> > My overall concern here is that, if there are more than one tasks are 
> > running, is it possible that all the tasks are writing to one file at the 
> > same time?
> 
> Eli Reisman wrote:
> I don't think so, each registered source should be using it's own 
> HdfsWriter in write() calls even on the same Producer and the filenames per 
> writer are unique-ified in the writer impl. There are other ways to 
> accomplish that uniqueness though.

I see. We are using the UUID.randomUUID to make sure the writers writes to 
different files. This is fine unless we win the lottery. :)


> On July 30, 2015, 6:59 p.m., Yan Fang wrote:
> > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala,
> >  line 37
> > <https://reviews.apache.org/r/35445/diff/4/?file=1023373#file1023373line37>
> >
> > I would prefer the param idea because 1) Samza is already using this 
> > fashion 2) less code especially when there are more SequenceFileHdfsWriter 
> > come out (LongWritable, etc)
> > 
> > "like the casting of the outgoing message to something not-Writable 
> > like Array[Byte] or String might require a third param and it might start 
> > to get awkward"
> > 
> > -- We can always cast the outgoing msg to Array[Byte] using the serde 
> > defined for this msg. So as long as the Wriable accepts Array[Byte], this 
> > should be fine.
> > 
> > "Also there are some Writable types that would not allow us to 
> > determine message size for batching purposes the way "
> > 
> > -- I think we can either give it a default size (this can be 
> > configurable) when there is not getLength method or use a subclass. Either 
> > way will be fine.
> 
> Eli Reisman wrote:
> I definitely agree on the less code point, and I think we can move 
> functions like the compression selection to the base class.
> 
> But, I don't think we can't just cast to Array[Byte] for all the Writable 
> types to accept the message, even from the serde. Only Text and BytesWritable 
> will accept Array[Byte] messages, so we will be limited to just those two 
> types forever if we are only using that cast on the outgoing message before 
> wrapping it in the Writable. If that works (i.e. messages will never be 
> FloatWritable, LongWritable etc.) then generics will work there.
> 
> But the getLength issue still presents a problem. We already have a 
> configuration to set a batch size default or user-defined one, but getLength 
> is called per-message-write, and it's how we track how big the current file 
> is. We won't know when to split or when we hit that configured size without 
> tracking it. Each Writable will need slightly different logic to pick up or 
> estimate message size, they don't all supply a getLength call for byte size.
> 
> So again that seems to force us to only work with BytesWritable and Text 
> value types? If I'm completely missing something here please let me know and 
> we can make the desire changes. Thanks for the input!

I see. Considering those facts, IMO, moving the common code, such as 
getCompression, getBukets, to their parent class is sufficient.

BTW, def getBucketer(systemName: String, config: HdfsConfig) = {
new JobNameDateTimeBucketer(systemName, config)
  }
  
Should it be Bucketer.getInstance(systemName, config) ?

If we move it to the parent class, then just "bucketer", I think.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93614
---


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35445/
> ---
> 
> (Updated July 28, 2015, 5:25 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-693: Very basic HDFS Producer service for Samza
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc 
>   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConf

Re: samza environment variable on containers

2015-07-30 Thread Yan Fang
Hi Chen Song,

I do not think there is a way in Samza with which you can specify the ENV
for Samza container.

And currently Samza does not read the LD_LIBRARY_PATH either.

Samza only puts the files in lib/*.[jw]ar into the CLASSPATH.

Though -Djava.library.path might work,  it will cause hadoop errors. :(

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 7:05 AM, Chen Song  wrote:

> Maybe a dumb question.
>
> Is there a way to set an ENV for samza containers?
>
> We want to set LD_LIBRARY_PATH to include hadoop native libs.
>
> --
> Chen Song
>


Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang


> On July 29, 2015, 7:41 p.m., Navina Ramesh wrote:
> > docs/learn/documentation/versioned/hdfs/producer.md, line 24
> > 
> >
> > Can you please update the list of available configuration for this 
> > system at 
> > http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
> >  as well? 
> > 
> > Like we did with elastic search, adding an example job to hello-samza 
> > will be very useful for adoption. If not in this JIRA, please consider 
> > opening a follow-up JIRA to add this example.
> 
> Eli Reisman wrote:
> Yeah I can do that. I think adding an example to hello-samza is a great 
> idea, but I agree I would make that a separate ticket and then I'd claim that 
> ticket.
> 
> Eli Reisman wrote:
> I'd also be interested in taking some follow up tickets for more output 
> formats than just seq files once this is out, and possibly the reader 
> implemention too although we'd need some up front discussion on the JIRA 
> ticket about what that looks like in a long-lived Samza job that needs to 
> read "forever"

That will be great ! :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93501
---


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35445/
> ---
> 
> (Updated July 28, 2015, 5:25 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-693: Very basic HDFS Producer service for Samza
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc 
>   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-job.properties PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  PRE-CREATION 
>   settings.gradle 19bff97 
> 
> Diff: https://reviews.apache.org/r/35445/diff/
> 
> 
> Testing
> ---
> 
> Updated: See JIRA SAMZA-693 for details, this latest update (693-4) addresses 
> post-review issues and adds more pluggable design, several default writer 
> implementations, and more (and more thorough) unit tests.
> 
> Passes 'gradle clean test'.
> 
> 
> Thanks,
> 
> Eli Reisman
> 
>



Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93614
---



docs/learn/documentation/versioned/hdfs/producer.md (line 33)
<https://reviews.apache.org/r/35445/#comment148011>

com.etsy -> org.apache.



docs/learn/documentation/versioned/hdfs/producer.md (line 65)
<https://reviews.apache.org/r/35445/#comment148012>

you also need to add a link in the document/index.html to make it 
"reachable".



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
 (line 38)
<https://reviews.apache.org/r/35445/#comment148015>

can we use the "camel case with an initial lower case character " for the 
varaible? Just to obey the same coding guide 
http://samza.apache.org/contribute/coding-guide.html

Thank you.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
(line 38)
<https://reviews.apache.org/r/35445/#comment148016>

not necessary



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
 (line 40)
<https://reviews.apache.org/r/35445/#comment148018>

My overall concern here is that, if there are more than one tasks are 
running, is it possible that all the tasks are writing to one file at the same 
time?



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
 (line 55)
<https://reviews.apache.org/r/35445/#comment148017>

this will very possibly never be executed, because line 53 already makes 
currentDateTime == dateTime.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
 (line 37)
<https://reviews.apache.org/r/35445/#comment148024>

I would prefer the param idea because 1) Samza is already using this 
fashion 2) less code especially when there are more SequenceFileHdfsWriter come 
out (LongWritable, etc)

"like the casting of the outgoing message to something not-Writable like 
Array[Byte] or String might require a third param and it might start to get 
awkward"

-- We can always cast the outgoing msg to Array[Byte] using the serde 
defined for this msg. So as long as the Wriable accepts Array[Byte], this 
should be fine.

"Also there are some Writable types that would not allow us to determine 
message size for batching purposes the way "

-- I think we can either give it a default size (this can be configurable) 
when there is not getLength method or use a subclass. Either way will be fine.



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 106)
<https://reviews.apache.org/r/35445/#comment148019>

do we need to bring up the MiniCluster in every test? Is it be better if we 
just bring them up in the beforeClass and then shutdown it in afterClass?


- Yan Fang


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35445/
> ---
> 
> (Updated July 28, 2015, 5:25 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-693: Very basic HDFS Producer service for Samza
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc 
>   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWr

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-07-29 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated July 29, 2015, 10:49 p.m.)


Review request for samza.


Changes
---

remove whiltespaces
update to latest master


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 6654319 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd3 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


> On July 29, 2015, 2:45 p.m., Robert Zuljevic wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 121-125
> > 
> >
> > Did you mean something like this?
> > 
> > for ((storeName, systemStream) <- changeLogSystemStreams) {
> >   val systemAdmin = Util.getObj[SystemFactory](config
> > .getSystemFactory(systemStream.getSystem)
> > .getOrElse(throw new SamzaException("A stream uses system %s, 
> > which is missing from the configuration." format systemStream.getSystem))
> > ).getAdmin(systemStream.getSystem, config)
> > 
> >   systemAdmin.createChangelogStream(systemStream.getStream, 
> > changeLogPartitions)
> > }
> > 
> > This is the only way I could thought of for simplifing this. I don't 
> > think what you posted would work, because you're using String's map 
> > function, but it did steer me in the right direction. Do you agree?

yes, this is correct. :) Thanks.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 121-125
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121>
> >
> > this can be simplified a little:
> > 
> > for ((storeName, systemStream) <- changeLogSystemStreams) {
> >   val systemAdmin = config
> > .getSystemFactory(systemStream.getName)
> > .getOrElse(throw new SamzaException("A stream uses system %s, 
> > which is missing from the configuration." format 
> > systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
> >   throw new SamzaException("Unable to get systemAdmin for store 
> > " + storeName + " and systemStream" + systemStream))
> >   
> >   
> > Then  do not need line 104-109, line 117-119.
> 
> Robert Zuljevic wrote:
> Did you mean something like this?
> 
> for ((storeName, systemStream) <- changeLogSystemStreams) {
>   val systemAdmin = Util.getObj[SystemFactory](config
> .getSystemFactory(systemStream.getSystem)
> .getOrElse(throw new SamzaException("A stream uses system %s, 
> which is missing from the configuration." format systemStream.getSystem))
> ).getAdmin(systemStream.getSystem, config)
> 
>   systemAdmin.createChangelogStream(systemStream.getStream, 
> changeLogPartitions)
> }
> 
> 
> This is the only way I could thought of for simplifing this. I don't 
> think what you posted would work, because you're using String's map function, 
> but it did steer me in the right direction. Do you agree?

yes, you are right. This is what I was thinking. Not tested the code though. :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: [DISCUSS] Release 0.10.0

2015-07-28 Thread Yan Fang
Actually, I also want to include a few patch-available features, especially:

1. broadcast stream (SAMZA-676)
- waiting for review

2. graphite support (SAMZA-340)
3. meter and histogram (SAMZA-683)
4. utility (SAMZA-401)
- 2,3,4 belong to Luis, if he does not have time to update, since they
only need some small changes, we can edit it and get +1 from another
committer.

5. hdfs producer (SAMZA-693)
- I am reviewing.

6. upgrade yarn to 2.7.1 (SAMZA-563)
   - though I am reviewing, this ticket is negotiable if we want to put
into the 0.10.0 release. If we do not, I think, when users enable the
worker-persisting and container-persisting features, Samza will not be able
to handle it. (Some classes are only available after yarn 2.5.0 while Samza
currently only support yarn 2.4.0)

7. others: scrooge, class loader isolation, etc.
- those are waiting for reviewing too.

My opinion is that, if we can clean up all the patch-available tickets, it
will be great. Most of them have been already reviewed more than once. So I
think it should not be very time-consuming to have them in the 0.10.0
release.

What do you think?

Of course, another must-have is the bug-fix of the Stream Appender. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 28, 2015 at 10:27 PM, Roger Hoover 
wrote:

> Thanks, Yi.
>
> I propose that we also include SAMZA-741 for Elasticsearch versioning
> support with the new ES producer.  I think it's very close to being merged.
>
> Roger
>
>
> On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan  wrote:
>
> > Hi, all,
> >
> > I want to start the discussion on the release schedule for 0.10.0. There
> > are a few important features that we plan to release in 0.10.0 and I want
> > to start this thread s.t. we can agree on what to include in 0.10.0
> > release.
> >
> > There are the following main features added in 0.10.0:
> > - RocksDB TTL support
> > - Add CoordinatorStream and disable CheckpointManager
> > - Elasticsearch Producer
> > - Host affinity
> > And other 0.10.0 tickets:
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
> >
> > I propose to cut a 0.10.0 release after we get the following issues
> > resolved:
> > - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator
> stream
> > - SAMZA-617: YARN host affinity in Samza
> >
> > Thoughts?
> >
> > Thanks!
> >
> > -Yi
> >
>


Re: no new topic created on Kafka

2015-07-28 Thread Yan Fang
 task.class=samza.http.demo.task.HttpDemoParserStreamTask ...

you are not using the StateStream class...

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu 
wrote:

> Hi, Yan
>
> I like to correct my previous comment, when I comment out
> systems.kafka.streams.http-demo.samza.offset.default=oldest
> systems.kafka.streams.http-demo.samza.reset.offset=true
>
> *the logger is not show at *at samza-container-0.log, but it make sense.
>
>
> Sincerely,
> Seina
>
> On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu 
> wrote:
>
> > Hi, Yan:
> >
> >   Thanks a lot for your reply.
> >  I tried to comment out
> systems.kafka.http-demo.samza.offset.default=oldest
> > and then I tried to comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> >  The result is same as before.  1. the checkoutpoint topic was created,
> 2.
> > the log created by Logger can be found at /samza-container-0.log. 3. no
> > exception is at samza-container-0.log.
> >
> >I guess something conflict between HttpDemoParserStreamTask and
> > HttpDemoStatsStreamTask? Is any resource registered by
> > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> recreate
> > a topic?
> >
> > Sincerely,
> > Selina
> >
> > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang  wrote:
> >
> >> Can you comment out
> "systems.kafka.http-demo.samza.offset.default=oldest"
> >> to see how it works? This seems not a correct property.
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> yanfang...@gmail.com
> >>
> >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu 
> >> wrote:
> >>
> >> > Hi, Dear All:
> >> >
> >> >I have two Tasks at Samza. HttpDemoParserStreamTask and
> >> > HttpDemoStatsStreamTask. They are almost same, except the output topic
> >> name
> >> > is different and the task name are different at properties file. I am
> >> > wondering how should I debug on it?
> >> >
> >> >More details are list below.
> >> >
> >> >All your help is highly appreciated.
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >> > Currently HttpDemoParserStreamTask run well.
> >> > However HttpDemoStatsStreamTask can generate the log correctly
> >> withouot
> >> > Exception at
> >> >
> >> >
> >>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_02/
> >> > samza-container-0.log
> >> >
> >> > The last record as below is right, however there is no topic "
> >> > demo-stats-temp" was created.
> >> > --
> >> >
> >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> >> >
> >> >
> >>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> >> >
> >>
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> >> >
> >>
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> >> > Galaxy S6","operationSystem":"Android
> >> >
> >> >
> >>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> >> >
> >> >
> >> > ---The demo-stats.properties
> >> > files-
> >> >
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=demo-stats-tmp
> >>
> >> >
> >> >
> >> >
> >> >
> >>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >> >  task.checkpoint.system=kafka
> >> >  # Normally, this would be 3, but we have only one

Re: no new topic created on Kafka

2015-07-28 Thread Yan Fang
Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
to see how it works? This seems not a correct property.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu 
wrote:

> Hi, Dear All:
>
>I have two Tasks at Samza. HttpDemoParserStreamTask and
> HttpDemoStatsStreamTask. They are almost same, except the output topic name
> is different and the task name are different at properties file. I am
> wondering how should I debug on it?
>
>More details are list below.
>
>All your help is highly appreciated.
>
> Sincerely,
> Selina
>
> Currently HttpDemoParserStreamTask run well.
> However HttpDemoStatsStreamTask can generate the log correctly withouot
> Exception at
>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_02/
> samza-container-0.log
>
> The last record as below is right, however there is no topic "
> demo-stats-temp" was created.
> --
>
> 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> Galaxy S6","operationSystem":"Android
>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
>
>
> ---The demo-stats.properties
> files-
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-stats-tmp
>
>
>
>  
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>  task.checkpoint.system=kafka
>  # Normally, this would be 3, but we have only one broker.
>  task.checkpoint.replication.factor=1
>
>  # YARN
>
>  
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
>  # Task
>  task.class=samza.http.demo.task.HttpDemoParserStreamTask
>  task.inputs=kafka.http-demo
>
>  # Serializers
>
>  
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
>  # Kafka System
>
>  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>  systems.kafka.samza.msg.serde=string
>
>  systems.kafka.samza.key.serde=string
>  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>  systems.kafka.producer.bootstrap.servers=localhost:9092
>
>  #stream from begining
>  #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
>  systems.kafka.http-demo.samza.offset.default=oldest
> # all stream from the oldest
>  systems.kafka.streams.http-demo.samza.offset.default=oldest
>  systems.kafka.streams.http-demo.samza.reset.offset=true
>
>
>
> HttpDemoStatsStreamTask
> class
>
> public class HttpDemoStatsStreamTask implements StreamTask  {
>
> //output topic
> private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-stats-temp");
> Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
>
> @SuppressWarnings("unchecked")
> @Override
> public void process(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator) throws
> Exception {
>
>
> String key = (String) envelope.getKey();
> String message = envelope.getMessage().toString();
> logger.info("key=" + key + ": message=" + message);
>
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
> }
> }
>
> -Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> topic--
>
> {"Partition 0":0}
> {"SystemStreamPartition [kafka, http-demo,
> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","

Re: Review Request 36768: SAMZA-740: Add ElasticsearchProducer example to samza-hello-samza

2015-07-27 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36768/#review93137
---


I think it's ok to keep the elastic grid seperate, though there is a little 
redundant code. Because conceptally they are different - this one is totally an 
add-on, while the "main" grid is for the Samza-must-have compoenents.


bin/grid-elastic (line 53)
<https://reviews.apache.org/r/36768/#comment147398>

remove space



bin/grid-elastic (line 121)
<https://reviews.apache.org/r/36768/#comment147399>

remove space



bin/grid-elastic (line 127)
<https://reviews.apache.org/r/36768/#comment147400>

remove the space


- Yan Fang


On July 24, 2015, 8:31 a.m., Stuart Davidson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36768/
> ---
> 
> (Updated July 24, 2015, 8:31 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza-hello-samza
> 
> 
> Description
> ---
> 
> I'd put together an example of how to use the new ElasticsearchProvider for 
> my work colleagues and I thought it'd be worth submitting it back to the 
> community. This also includes a script to start elasticsearch and kibana on 
> the host - be aware, there's a 64bit version depending on what machine you 
> want to run against.
> 
> Also note, this is against 0.10.0 of Samza which is not released yet. I am 
> making the assumption that the Elasticsearch jars are bundled as part of that 
> - if not, we'll need to add them to the POM here.
> 
> 
> Diffs
> -
> 
>   bin/grid-elastic PRE-CREATION 
>   pom.xml f9c4fa9 
>   src/main/assembly/src.xml f57fee2 
>   src/main/config/wikipedia-elastic.properties PRE-CREATION 
>   src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36768/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Stuart Davidson
> 
>



Re: kafka producer failed

2015-07-26 Thread Yan Fang
You may check the Kafka.log to see what's inside 

Yan Fang

> On Jul 26, 2015, at 2:01 AM, Job-Selina Wu  wrote:
> 
> The exception is below:
> 
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at http.server.HttpDemoHandler.doDemo(HttpDemoHandler.java:71)
> at http.server.HttpDemoHandler.handle(HttpDemoHandler.java:32)
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at org.eclipse.jetty.server.Server.handle(Server.java:498)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:265)
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:243)
> at
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
> at java.lang.Thread.run(Thread.java:745)
> 
> On Sun, Jul 26, 2015 at 12:42 AM, Job-Selina Wu 
> wrote:
> 
>> Hi, Yan:
>> 
>>  My Http Server send message to Kafka.
>> 
>> The server.log at deploy/kafka/logs/server.log shown :
>> 
>> [2015-07-26 00:33:51,910] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> [2015-07-26 00:33:51,984] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> [2015-07-26 00:33:52,011] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> 
>> .
>> 
>> 
>> Your help is highly appreciated.
>> 
>> Sincerely,
>> 
>> Selina
>> 
>> 
>>> On Sun, Jul 26, 2015 at 12:01 AM, Yan Fang  wrote:
>>> 
>>> You are giving the Kafka code and the Samza log, which does not make sense
>>> actually...
>>> 
>>> Fang, Yan
>>> yanfang...@gmail.com
>>> 
>>> On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu 
>>> wrote:
>>> 
>>>> Hi, Yi, Navina and Benjamin:
>>>> 
>>>>Thanks a lot to spending your time to help me this issue.
>>>> 
>>>>The configuration is below. Do you think it could be the
>>> configuration
>>>> problem?
>>>> I tried props.put("request.required.acks", "0"); and
>>>> props.put("request.required.acks", "1"); both did not work.
>>>> 
>>>> 
>>>> Properties props = new Properties();
>>>> 
>>>>private final Producer producer;
>>>> 
>>>>public KafkaProducer() {
>>>>//BOOTSTRAP.SERVERS
>>>>props.put("metadata.broker.list", "localhost:9092");
>>>>props.put("bootstrap.servers", "localhost:9092 ");
>>>>props.put("serializer.class", "kafka.serializer.StringEncoder");
>>>>props.put("partitioner.class", "com.kafka.SimplePartitioner");
>>>>props.put("request.required.acks", "0");
>>>> 
>>>>ProducerConfig config = new ProducerConfig(props);
>>>> 
>>>>producer = new Producer(config);
>>>>}
>>>> 
>>>> --
>>>> 
>>>> Exceptions at log are list below.
>>>> 
>>>> Your help is highly appreciated.
>>>> 
>>>> Sincerely,
>>>> Selina Wu
>>>> 
>>>> 
>>>> Exceptions at log
>>> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_01/samza-application-master.log
>>>> 
>>>> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
>>>> directory
>>>> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
>>>>   at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
>>>>   at org.apache.hadoop.util.Shell.(Shell.java:290)
>>>>   at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
>>>>   at
>>> org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:517)
>>>>   at
>>>> org.a

Re: kafka producer failed

2015-07-26 Thread Yan Fang
You are giving the Kafka code and the Samza log, which does not make sense
actually...

Fang, Yan
yanfang...@gmail.com

On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu 
wrote:

> Hi, Yi, Navina and Benjamin:
>
> Thanks a lot to spending your time to help me this issue.
>
> The configuration is below. Do you think it could be the configuration
> problem?
> I tried props.put("request.required.acks", "0"); and
>  props.put("request.required.acks", "1"); both did not work.
>
> 
> Properties props = new Properties();
>
> private final Producer producer;
>
> public KafkaProducer() {
> //BOOTSTRAP.SERVERS
> props.put("metadata.broker.list", "localhost:9092");
> props.put("bootstrap.servers", "localhost:9092 ");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("partitioner.class", "com.kafka.SimplePartitioner");
> props.put("request.required.acks", "0");
>
> ProducerConfig config = new ProducerConfig(props);
>
> producer = new Producer(config);
> }
>
> --
>
>  Exceptions at log are list below.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina Wu
>
>
> Exceptions at log
>
> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_01/samza-application-master.log
>
> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
> directory
> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
>at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
>at org.apache.hadoop.util.Shell.(Shell.java:290)
>at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
>at
> org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:517)
>at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
>at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
> machine. So not using it.
> 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
> 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
> 127.0.0.1:8032
> 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state
> INITED
> 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at
> /127.0.0.1:8032
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[Rate of successful kerberos logins
> and latency (milliseconds)], valueName=Time)
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[Rate of failed kerberos logins and
> latency (milliseconds)], valueName=Time)
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
> 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and
> group related metrics
> 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration
> not found, setting default realm to empty
> 2015-07-25 22:03:52 Groups [DEBUG]  Creating new Groups object
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the
> custom-built native-hadoop library...
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load
> native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in
> java.library.path
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG]
>
> java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
> 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load
> native-hadoop library for your platform... using builtin-java classes
> where applicable
>
>
> 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to
> validate topic __samza_checkpoint_ver_1_for_demo-parser7_1:
> *kafka.common.LeaderNotAvailableException. Retrying.*
> 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail:
> kafka.common.LeaderNotAvailableException
>at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.

Re: Security on YARN

2015-07-24 Thread Yan Fang
Hi Chen Song,

If you can work on this issue, it will be great.

1. the related ticket is https://issues.apache.org/jira/browse/SAMZA-727

2. most of the change will happen in Yarn AM and Yarn client parts. The
code sits in the samza-yarn package
<https://github.com/apache/samza/tree/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn>
.

3. when you implement this, make sure it does not affect the non-secure
Yarn implementation. Because non-secure cluster implementation has been
proved working, while the secure cluster may have the issue as Yi Pan
mentioned, "For a long-running
Samza job, it does not work. We will need a way to refresh the Kerberos ticket
periodically, which is not supported yet. " But I am happy to see at least
we have some support for secure cluster. We can figure the issue out later.

If you want to have some help in understanding the existing code, let me
know.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 7:00 PM, Chen Song  wrote:

> Can someone give some context on this? I can volunteer myself and try
> working on this.
>
> Chen
>
> On Thu, Jul 2, 2015 at 4:29 AM, Qi Fu  wrote:
>
> > Hi Yi & Yan,
> >
> > Many thanks for your information. I have created a jira for this:
> > https://issues.apache.org/jira/browse/SAMZA-727
> > I'm willing to test it if someone can work on this.
> >
> >
> > -Qi
> >
> > 
> > From: Yi Pan 
> > Sent: Thursday, July 2, 2015 1:38 AM
> > To: dev@samza.apache.org
> > Subject: Re: Security on YARN
> >
> > Hi, Yan,
> >
> > Your memory serves as well as mine. :) I remember that Chris and I
> > discussed this Kerberos ticket expiration issue when we were brain
> storming
> > on how to access HDFS data in Samza. At high-level, what happens is that
> > the Kerberos ticket to access a secured Hadoop cluster is issued to Samza
> > containers at the job start time, and will expire later. For a
> long-running
> > Samza job, it does not work. We will need a way to refresh the Kerberos
> > ticket periodically, which is not supported yet. Chris probably can chime
> > in with more details.
> >
> > -Yi
> >
> > On Wed, Jul 1, 2015 at 4:08 PM, Yan Fang  wrote:
> >
> > > Hi Qi,
> > >
> > > I think this is caused by the fact that Samza currently does not
> support
> > > Yarn with Kerberos. Feel free to open a ticket for this feature.
> > >
> > > But if my memory serves, there was an issue mentioned about the
> Kerberos.
> > > Seems when the Kerberos ticket expires, Samza will have some issues?
> Can
> > > not find the resource. Anyone remember this?
> > >
> > > Cheers,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Jul 1, 2015 at 3:41 AM, Qi Fu  wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > I'm testing Samza on YARN and I have encountered a problem on the
> > > security
> > > > setting of YARN (Kerberos). Here is the detail:
> > > >
> > > > 1. My cluster is secured by Kerberos, and I deploy my samza job from
> > one
> > > > of the cluster.
> > > >
> > > >
> > > > 2. My config file is in ~/.samza/conf/(yarn-site.xml, core-site.xml,
> > > > hdfs-site.xml)
> > > >
> > > >
> > > > 3. The job is deployed successfully, and I can get the info such as:
> > > >
> > > > ClientHelper [INFO] set package url to scheme: "hdfs" port: -1
> > file:
> > > > "/user/test/samzatest.tar.gz" for application_1435680272316_0003
> > > >
> > > > ClientHelper [INFO] set package size to 212924524 for
> > > > application_1435680272316_0003
> > > >
> > > >
> > > >
> > > > I think the security setting is correct as it can get the file
> size
> > > > from HDFS.
> > > >
> > > >
> > > > 4. But I get the error from YARN job manager as following:
> > > >
> > > >
> > > > Application application_1435680272316_0003 failed 2 times due to
> AM
> > > > Container for appattempt_1435680272316_0003_02 exited with
> > exitCode:
> > > > -1000
> > > >
> > > > For more detailed output, check application tracking page:
> > > > http://cdh-namenode:8088/proxy/application_1435680272316_0003/Then,
> > > click
> > > > on links 

Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-24 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 80)
<https://reviews.apache.org/r/36163/#comment147281>

it should be config, not coordinatorSystemConfig because we need to update 
the config from the stream.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 101)
<https://reviews.apache.org/r/36163/#comment147282>

its private because its only used by this class.

Also move this to the end of the class because it is good to put all the 
private methods together.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 121 - 125)
<https://reviews.apache.org/r/36163/#comment147283>

this can be simplified a little:

for ((storeName, systemStream) <- changeLogSystemStreams) {
  val systemAdmin = config
.getSystemFactory(systemStream.getName)
.getOrElse(throw new SamzaException("A stream uses system %s, which 
is missing from the configuration." format 
systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
  throw new SamzaException("Unable to get systemAdmin for store " + 
storeName + " and systemStream" + systemStream))
  
  
Then  do not need line 104-109, line 117-119.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 126)
<https://reviews.apache.org/r/36163/#comment147284>

add logs for the case where the topic is already existied. Log the metadata 
information. (like the original createStream code does)


- Yan Fang


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
{quote}
 I did not set auto.create.topics.enable anywhere
{quote}

Fine. Then its default to true. No worries.

{quote}
My job is listed as below. However I am wondering how can I know if my
method "public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)" was run or not.
{quote}

If you have log enabled (from the code, you did), you can check the
contain's log to see if it has the output. Assuming you are using the local
yarn like what hello-samza provides, you should be able to check the logs
in deploy/yarn/userlogs/application_Id.

If you use print.out method, you can see the result in the
deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
works.

If it does not work, you can check the logs in
deploy/yarn/userlogs/application_Id as well to see the exceptions if there
is any.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu 
wrote:

> Hi, Yan and Shadi:
>
> I made a mistake.  Actually, there is no log at /tmp/kafka-logs
> created by "  logger.info("key="+key+": message="+message); ".  The log I
> provided actually is log for input topic "http-demo" at
> /tmp/kafka-logs/http-demo-0
>
> My job is listed as below. However I am wondering how can I know if
> my method "public void* process*(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator)" was run or not.
>
> I manually create topic "demo-duplicate" by command line, otherwise
> it will be created by samza code.
>
> I checked I did not set auto.create.topics.enable anywhere. Attached
> is my properties file for Kafka
>
>
>    Your help is highly appreciated
>
> Sincerely,
> Selina
>
> [image: Inline image 1]
>
>
>
>
> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang  wrote:
>
>> The code and the property seem good to me. collector.send(new
>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
>> curious if you accidentally disabled auto.create.topics.enable  ...Can you
>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
>> anything.
>>
>> Let me know if it works.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>>
>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu 
>> wrote:
>>
>> > Hi, Shadi:
>> >
>> >   Thans a lot for your reply.
>> > 1. There is no error log at Kafka and Samza
>> >
>> > 2.  this line "  logger.info("key="+key+": message="+message); " write
>> > log correctly as below:
>> >
>> > [image: Inline image 1]
>> >
>> > This are my last two message with right count
>> >
>> > 3. I tried both way below, none of them create topic, but I will try it
>> > again.
>> >
>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>> >
>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>> >
>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the
>> content
>> > can be show with command line below, so the Kafka should be OK.
>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
>> > --from-beginning --topic http-demo
>> >
>> > Your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >
>> >
>> >
>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
>> > snogh...@linkedin.com.invalid> wrote:
>> >
>> >> Selina,
>> >>
>> >> You should probably check a few things
>> >> 1. Your log files to see if you have any errors. Also, does you job
>> fail
>> >> or
>> >> continues running?
>> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
>> >> write
>> >> any logs?
>> >> 3. This might not be the only reason, but you are sending messages of
>> >> type Map> >> String>. However, in your config file, you defined "
>> >> systems.kafka.samza.msg.serde=string" which expects the message to be a
>> >> String.
>> >>
>> >>
>> >> Shadi
>> >>
>> >>
>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu 
>> >> wrote:
>> >>
>> >> > Hi,  All
>> >> >
>> >> > 

Re: Can I get an example of using the ElasticSearch producer?

2015-07-24 Thread Yan Fang
Hi guys,

Thank you for being interested in this new producer. The producer is only
in the master branch, so if you are using the 0.9.1 version, you wont get
this support.

If by any chance you are using the latest version,

1) here are some configuration
http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
, see the "Using Elasticsearch for output streams" part.

2) check this patch https://issues.apache.org/jira/browse/SAMZA-740 as
well. We haven't merged to hello-samza, but it will give you some idea how
to implement. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:20 PM, Job-Selina Wu 
wrote:

> Dear All:
>
> I like to have an example of using the ElasticSearch producer also.
>
>
> Thanks
> Selina
> swucaree...@gmail.com
>
>
>
>
> On Fri, Jul 24, 2015 at 1:03 PM, Woessner, Leo 
> wrote:
>
> > Can I get an example of using the ElasticSearch producer?
> >
> > leo.woess...@pearson.com
> >
>


Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
The code and the property seem good to me. collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
curious if you accidentally disabled auto.create.topics.enable  ...Can you
also try to send msgs from cmd line to "demo-duplicate" to see if it gets
anything.

Let me know if it works.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu 
wrote:

> Hi, Shadi:
>
>   Thans a lot for your reply.
> 1. There is no error log at Kafka and Samza
>
> 2.  this line "  logger.info("key="+key+": message="+message); " write
> log correctly as below:
>
> [image: Inline image 1]
>
> This are my last two message with right count
>
> 3. I tried both way below, none of them create topic, but I will try it
> again.
>
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>
> //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>
> 4. I wrote a topic call "http-demo" to Kafka as my input, and the content
> can be show with command line below, so the Kafka should be OK.
> deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic http-demo
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
>
>
> On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
> snogh...@linkedin.com.invalid> wrote:
>
>> Selina,
>>
>> You should probably check a few things
>> 1. Your log files to see if you have any errors. Also, does you job fail
>> or
>> continues running?
>> 2. Does this line "  logger.info("key="+key+": message="+message); "
>> write
>> any logs?
>> 3. This might not be the only reason, but you are sending messages of
>> type Map> String>. However, in your config file, you defined "
>> systems.kafka.samza.msg.serde=string" which expects the message to be a
>> String.
>>
>>
>> Shadi
>>
>>
>> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu 
>> wrote:
>>
>> > Hi,  All
>> >
>> >  I am trying to write my first StreamTask class. I have a topic at
>> > Kafka called "http-demo". I like to read the topic and write it to
>> another
>> > topic called "demo-duplicate"
>> >
>> > Howeven there is not topic written to Kafka.
>> >
>> > My properties file and StreamTask are below.  Can anyone told me
>> what
>> > is the bug?
>> > BTW, if I set checkpoint or Metrics at properties file. the topic of
>> > checkpoint and metrics could be written to Kafka.  And the content of
>> >  input topic -- http-demo could be show correctly.
>> >
>> > Your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >
>> > - - -- - - - - -
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=demo-parser
>>
>> >
>> > # YARN
>> >
>> >
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >
>> > # Task
>> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> > task.inputs=kafka.http-demo
>> >
>> > # Serializers
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> > # Kafka System
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> > systems.kafka.samza.msg.serde=string
>> > systems.kafka.samza.key.serde=string
>> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> > systems.kafka.consumer.auto.offset.reset=largest
>> > systems.kafka.producer.bootstrap.servers=localhost:9092
>> > - - -- - - - - -
>> >
>> > My StreamTask class is simple also
>> >
>> > -
>> >
>> > /**
>> >  *
>> >  * Read data from http-demo topic and write it back to "demo-duplicate"
>> >  */
>> > public class HttpDemoParserStreamTask implements StreamTask {
>> >
>> > private static final SystemStream OUTPUT_STREAM = new
>> > SystemStream("kafka", "demo-duplicate");
>> > Logger logger =
>> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>> >
>> > @SuppressWarnings("unchecked")
>> > @Override
>> > public void process(IncomingMessageEnvelope envelope,
>> MessageCollector
>> > collector, TaskCoordinator coordinator) throws Exception {
>> >
>> > String key = (String) envelope.getKey();
>> > String message = envelope.getMessage().toString();
>> > logger.info("key="+key+": message="+message);
>> >
>> > Map outgoingMap = (Map)
>> > (envelope.getMessage());
>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > outgoingMap));
>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > message));
>> > }
>> >
>> > }
>> >
>> > ---
>> >
>>
>
>


Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-23 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/#review92825
---



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
(line 43)
<https://reviews.apache.org/r/36545/#comment147063>

To be consistent, lets go with TaskName, not the String.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
(line 57)
<https://reviews.apache.org/r/36545/#comment147064>

Any reason that you do not want to use the TaskName class? TaskName seems 
fine here.



samza-core/src/main/java/org/apache/samza/container/LocalityManager.java (line 
50)
<https://reviews.apache.org/r/36545/#comment147065>

sourceSuffix is more descriptive.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 20)
<https://reviews.apache.org/r/36545/#comment147066>

I think it makes sense that this class stays in its original package: 
samza-core/src/main/java/org/apache/samza/coordinator/stream . Because its only 
about the coordinatorStream, not the overall "manager" of the samza.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 29)
<https://reviews.apache.org/r/36545/#comment147071>

a little more in the doc. This class is not really "manages" the 
coordinator stream, it is an abstract class that other stream managers want to 
extend.

Also, renaming it to AbstractCoordinatorStreamManager maybe helpful too.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 65)
<https://reviews.apache.org/r/36545/#comment147072>

typo, sends



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 96)
<https://reviews.apache.org/r/36545/#comment147073>

no "+"



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 112)
<https://reviews.apache.org/r/36545/#comment147074>

I think, taskName maybe more general. In case we have more information in 
the TaskName, or other rules of registering. Just personal idea.



samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
 (line 52)
<https://reviews.apache.org/r/36545/#comment147067>

going with the taskName is fine.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
(line 151)
<https://reviews.apache.org/r/36545/#comment147068>

if we use TaskName in the regitser method, do not need to change this one.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
245)
<https://reviews.apache.org/r/36545/#comment147069>

same



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 185)
<https://reviews.apache.org/r/36545/#comment147070>

same


- Yan Fang


On July 16, 2015, 1:33 p.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36545/
> ---
> 
> (Updated July 16, 2015, 1:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> The following has been refactored: 
> 1. Static inner classes from CoordinatorStreamMessage has been extracted
> 2. Common functionality from CheckpointManager, ChangelogMappingManager and 
> LocalityManager has benn moved to a base class
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml eef3370 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 7445996 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> 55c258f 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  6bd1bd3 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  b1078bd 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  92f8907 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/message

Re: Access to high-watermark from within a Samza job

2015-07-23 Thread Yan Fang
Hi Tommy,

It has not been implemented just because no one is working on it, not other
reasons. :) If you want to take a stab, feel free to do this. That will be
great.

(copycat use case? :)

Cheers,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 23, 2015 at 1:23 PM, Tommy Becker  wrote:

> I'm writing a Samza job that basically serves to pump data out of Kafka
> into another system.  For my particular use-case, I want to essentially
> process the entire topic as it exists when the job starts and then exit.
> As far as I can tell, there doesn't seem to be a way to do that right now
> because it is impossible for the job to determine the high-watermark of the
> topics it's processing.  I found this issue that mentions adding a
> getHighWatermark() to IncomingMessageEnvelope:
>
> https://issues.apache.org/jira/browse/SAMZA-539
>
> The use-case discussed there seems to be metrics but this API would enable
> mine as well.  This seems pretty trivial to add, is there some reason it
> hasn't been done yet?  Otherwise I can take a stab at it.  Or is there
> another way to do what I need that I'm unaware of?
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com
> tobec...@tivo.com
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: question on commit on changelog

2015-07-22 Thread Yan Fang
Hi Chen Song,

There are two different concepts: *checkpoint* and *changelog*. Checkpoint
is for the offset of the messages, while the changelog is for the kv-store.
The code snippet you show is for the checkpoint , not for the changelog.

{quote}
1. When implementing our Samza task, does each call of process method
triggers a call to TaskInstance.commit?
{quote}

TaskInstance.commit triggers the *checkpoint* . It is triggered every
task.commit.ms , (default is 6ms). The code is here

. Basically, the RunLoop class calls the commit method, but only trigger
the commit behavior every configured time.

If you are talking about the *changelog*, it's not controlled by the commit
method. Instead, every put/delete calls the "send
"
of the system Producer. (code is here
).
In terms of how often the "send" really *send *to the broker (e.g. kafka),
it depends on your producer's configuration. For example, in Kafka, you can
have the producer send a batch (setting async), or send one msg a time
(setting sync). What it means is that, it leaves the System to decide how
to deal with the "send" method.


{quote}
2. Is there a way to buffer these commit activities in memory and flush
periodically? Our job is joining >1mm messages per second using a KV store
and we have a lot of concern for the changelog size, as in the worst case,
the change log will grow as fast as the input log.
{quote}

If you are talking about the checkpoint, you can change the task.commit.ms .

If you are thinking of the changelog (kv-store), you can change the
producer's config to batch a few changes and send to the broker.

I think the guys in the community with more operational experience are able
to tell you what is the best practice.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 22, 2015 at 9:00 AM, Chen Song  wrote:

> We are trying to understand the order of commits when processing each
> message in a Samza job.
>
> T1: input offset commit
> T2: changelog commit
> T3: output commit
>
> By looking at the code snippet in
>
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> ,
> my understanding is that for each input message, Samza always send update
> message on changelog, send the output message and then commit the input
> offset. It makes sense to me at the high level in terms of at least once
> processing.
>
> Specifically, we have two dumb questions:
>
> 1. When implementing our Samza task, does each call of process method
> triggers a call to TaskInstance.commit?
> 2. Is there a way to buffer these commit activities in memory and flush
> periodically? Our job is joining >1mm messages per second using a KV store
> and we have a lot of concern for the changelog size, as in the worst case,
> the change log will grow as fast as the input log.
>
> Chen
>
> --
> Chen Song
>


Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-22 Thread Yan Fang


> On July 21, 2015, 5:42 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, 
> > lines 36-37
> > <https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36>
> >
> > though it works, prefer to use the "def" here, not only because it has 
> > leff overhead, but also keep all the methods consistent for better 
> > readability. What do you think?
> 
> Roger Hoover wrote:
> Sounds good.  I only baulked on it the first time because I'm not that 
> skilled with Scala type decarations yet. :)  I can make this work
> 
> Roger Hoover wrote:
> I take it back.  It seems it [can't be 
> done](http://www.scala-lang.org/old/node/5082)

aha, sorry for the misleading. I think, what I mean here is to change the "val" 
to "def": val newCounter = metricGroup.newCounter(_) ==> def newCounter(name: 
String).


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---


On July 22, 2015, 4:07 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 22, 2015, 4:07 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
> 8eac8ef 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  e63d62c 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---



samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java (line 23)
<https://reviews.apache.org/r/36473/#comment146595>

of the class "the" extends -> of the class "that" extends



samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java (line 29)
<https://reviews.apache.org/r/36473/#comment146607>

can we also have a constructor with the default prefix ""?



samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala (line 29)
<https://reviews.apache.org/r/36473/#comment146598>

the extends -> that extends



samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala (lines 
36 - 37)
<https://reviews.apache.org/r/36473/#comment146606>

though it works, prefer to use the "def" here, not only because it has leff 
overhead, but also keep all the methods consistent for better readability. What 
do you think?


- Yan Fang


On July 21, 2015, 5:41 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 21, 2015, 5:41 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
> 8eac8ef 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  PRE-CREATION 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  e63d62c 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 33419: SAMZA-625: Provide tool to consume changelog and materialize a state store

2015-07-15 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33419/
---

(Updated July 16, 2015, 12:56 a.m.)


Review request for samza.


Changes
---

fix nits.
get rid of writing the latest offset to the file


Bugs: SAMZA-625
https://issues.apache.org/jira/browse/SAMZA-625


Repository: samza


Description
---

Implemented in Java.

* modified build.gradle to have the gradle compile scala first. Because some 
jave code has dependencies to Scala code
* change the state store name by removing the space ( in TaskManager )
* add scala java conversion method in Util because some classes only accept 
scala map
* add java version of some configs 
* remove duplicated config in samza-log4j
* add StorageRevoery class, which does most of the recoverying job. The logic 
mimics what happens in SamzaContainer.
* add StateStorageTool, for the commandline usage
* unit tests
* docs


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  docs/learn/documentation/versioned/container/state-management.md 79067bb 
  samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
aeba61a 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 2feb65b 
  samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
PRE-CREATION 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
d5e24f2 
  samza-shell/src/main/bash/state-storage-tool.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/33419/diff/


Testing
---

tested with multiple partitions and multiple stores recovery.


Thanks,

Yan Fang



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang


> On 七月 14, 2015, 9:44 p.m., Yan Fang wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
> >  line 24
> > <https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24>
> >
> > can this class extends MetricsHelper? This can simplifies a little.
> 
> Roger Hoover wrote:
> I don't see how it simplifies things because I have to implement all the 
> methods in the Scala trait.  I'm having trouble getting the newGauge 
> signatures to match.
> 
> ```
> public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
> public final Counter bulkSendSuccess;
> public final Counter inserts;
> public final Counter updates;
> private final MetricsRegistry registry;
> private final String group;
> private final String systemName;
> 
> public interface JFunction {
> R apply();
> }
> 
> public ElasticsearchSystemProducerMetrics(String systemName, 
> MetricsRegistry registry) {
> group = this.getClass().getName();
> this.registry = registry;
> this.systemName = systemName;
> 
> bulkSendSuccess = newCounter("bulk-send-success");
> inserts = newCounter("docs-inserted");
> updates = newCounter("docs-updated");
> }
> 
> @Override
> public Counter newCounter(String name) {
> return MetricsHelper$class.newCounter(this, name);
> }
> 
> @Override
> public  Gauge newGauge(String name, T value) {
> return MetricsHelper$class.newGauge(this, name, value);
> }
> 
> @Override
> public  Gauge newGauge(String name, JFunction value) {
> return null;
> }
> 
> @Override
> public Timer newTimer(String name) {
> return MetricsHelper$class.newTimer(this, name);
> }
> 
> @Override
> public String getPrefix() {
> return systemName + "-";
> }
> 
> @Override
> public MetricsRegistry registry() {
> return registry;
> }
> 
> @Override
> public String group() {
>     return group;
> }
> }
> ```
> 
> Roger Hoover wrote:
> We really only need counters for this class but have to figure out how to 
> implement the Scala newGauge methods which are tricky.  Would appreciate help 
> if you know how to do it.
> 
> Yan Fang wrote:
> Oh, I see. Sorry for the confusion. I did not realize there is a 
> java-scala issue sitting here. Ok. I am fine with going the original 
> approach, which seems clear enough

Another way is to add a Java version of MetricsHelper. Then 
ElasticsearchSystemProducerMetrics and other future java-version metrics can 
extend it. This helps future implementation. Otherwise, I think the first patch 
is fine. Let you make the decision. Thanks. :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On 七月 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated 七月 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang


> On July 14, 2015, 9:44 p.m., Yan Fang wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
> >  line 24
> > <https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24>
> >
> > can this class extends MetricsHelper? This can simplifies a little.
> 
> Roger Hoover wrote:
> I don't see how it simplifies things because I have to implement all the 
> methods in the Scala trait.  I'm having trouble getting the newGauge 
> signatures to match.
> 
> ```
> public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
> public final Counter bulkSendSuccess;
> public final Counter inserts;
> public final Counter updates;
> private final MetricsRegistry registry;
> private final String group;
> private final String systemName;
> 
> public interface JFunction {
> R apply();
> }
> 
> public ElasticsearchSystemProducerMetrics(String systemName, 
> MetricsRegistry registry) {
> group = this.getClass().getName();
> this.registry = registry;
> this.systemName = systemName;
> 
> bulkSendSuccess = newCounter("bulk-send-success");
> inserts = newCounter("docs-inserted");
> updates = newCounter("docs-updated");
> }
> 
> @Override
> public Counter newCounter(String name) {
> return MetricsHelper$class.newCounter(this, name);
> }
> 
> @Override
> public  Gauge newGauge(String name, T value) {
> return MetricsHelper$class.newGauge(this, name, value);
> }
> 
> @Override
> public  Gauge newGauge(String name, JFunction value) {
> return null;
> }
> 
> @Override
> public Timer newTimer(String name) {
> return MetricsHelper$class.newTimer(this, name);
> }
> 
> @Override
> public String getPrefix() {
> return systemName + "-";
> }
> 
> @Override
> public MetricsRegistry registry() {
> return registry;
> }
> 
> @Override
> public String group() {
> return group;
> }
> }
> ```
> 
> Roger Hoover wrote:
> We really only need counters for this class but have to figure out how to 
> implement the Scala newGauge methods which are tricky.  Would appreciate help 
> if you know how to do it.

Oh, I see. Sorry for the confusion. I did not realize there is a java-scala 
issue sitting here. Ok. I am fine with going the original approach, which seems 
clear enough


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---



samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 (line 152)
<https://reviews.apache.org/r/36473/#comment145224>

remove the space



samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 (line 24)
<https://reviews.apache.org/r/36473/#comment145225>

can this class extends MetricsHelper? This can simplifies a little.


- Yan Fang


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36473/
> ---
> 
> (Updated July 14, 2015, 6:12 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-733 Add metrics to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  a277b69 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  7eb14a2 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/36473/diff/
> 
> 
> Testing
> ---
> 
> Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
> stream and that the metrics correctly count how many Elasticsearch documents 
> were created and indexed.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Another checkpoint tool question

2015-07-14 Thread Yan Fang
Actually I have not tried this. But I do not think it will work. Because
the next offset is the offset you set + 1. There is no mechanism inside
Samza that treats -1 as the latest (though in Kafka it does). If it works,
let me know. :) I may miss something.

Another way of doing this is to set the latest offset using the checkpoint
tool. You can get the latest offset of the topic from Kafka using
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
<https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-ConsumerOffsetChecker>

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 14, 2015 at 10:33 AM, Bae, Jae Hyeon  wrote:

> Hi Yan
>
> Thanks for your response. I want to reset the offset just once now. If I
> use systems.system-name.streams.stream-name.samza.reset.offset in the
> configuration file, before restarting the job, I need to remove those
> properties again. That's why I don't want to use that property.
>
> Is it OK setting invalid offset such as -1 or Long.MAX_VALUE?
>
> On Tue, Jul 14, 2015 at 10:16 AM, Yan Fang  wrote:
>
> > Hi Jae,
> >
> > Do you want to reset the offset to the latest one when you *start *the
> job
> > or when the just is *running*?
> >
> > If it's the former one, you can use
> >
> > systems.system-name.samza.offset.default=upcoming
> > systems.system-name.streams.stream-name.samza.reset.offset=true
> >
> > What is the reason that you do not want to use
> > systems.system-name.streams.stream-name.samza.reset.offset
> > ?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Tue, Jul 14, 2015 at 12:49 AM, Bae, Jae Hyeon 
> > wrote:
> >
> > > Hi
> > >
> > > I want to reset the offset for the job to the latest one, which means I
> > > want to ignore them without using
> > > systems.system-name.streams.stream-name.samza.reset.offset option.
> > >
> > > If I use checkpoint tool and reset the offset as -1 or Long.MAX_VALUE,
> in
> > > my theory, kafka consumer will throw an exception and it will reset
> them
> > as
> > > the latest one, am I right? Otherwise, please let me know how to reset
> > > offsets to the latest one.
> > >
> > > Thank you
> > > Best, Jae
> > >
> >
>


Re: Another checkpoint tool question

2015-07-14 Thread Yan Fang
Hi Jae,

Do you want to reset the offset to the latest one when you *start *the job
or when the just is *running*?

If it's the former one, you can use

systems.system-name.samza.offset.default=upcoming
systems.system-name.streams.stream-name.samza.reset.offset=true

What is the reason that you do not want to use
systems.system-name.streams.stream-name.samza.reset.offset
?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 14, 2015 at 12:49 AM, Bae, Jae Hyeon  wrote:

> Hi
>
> I want to reset the offset for the job to the latest one, which means I
> want to ignore them without using
> systems.system-name.streams.stream-name.samza.reset.offset option.
>
> If I use checkpoint tool and reset the offset as -1 or Long.MAX_VALUE, in
> my theory, kafka consumer will throw an exception and it will reset them as
> the latest one, am I right? Otherwise, please let me know how to reset
> offsets to the latest one.
>
> Thank you
> Best, Jae
>


Re: Review Request 36089: SAMZA-670 Allow easier access to JMX port

2015-07-13 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36089/#review91533
---



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
634)
<https://reviews.apache.org/r/36089/#comment144956>

I think a better way, which requires much fewer changes, is to call 
something like jmxServer.getJmxUrl, jmxServer.jmxTunelingUrl.

jmxServer can be a variable of SamzaContainer Object.

Then we do not need to change ContainerModel, JobModel, SamzaContext. 
Because there is no reason that we want to contain jmx information into those 
three objects.


- Yan Fang


On July 1, 2015, 2:07 p.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36089/
> ---
> 
> (Updated July 1, 2015, 2:07 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> JMX address of application master and the containers is available through AM 
> UI
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 3374f0c 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> fd7333b 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> e661e12 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  6c1e488 
>   samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java 
> 98a34bc 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 95a2ce5 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cbacd18 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a 
>   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala f343faf 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 9fb1aa9 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 7caad28 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  1ce7d25 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml cf0d2fc 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> 20aa373 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 1445605 
> 
> Diff: https://reviews.apache.org/r/36089/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> József Márton Jung
> 
>



Re: Review Request 36274: SAMZA-401: getCpuTime to truly calculate duty cycle of the event loop

2015-07-13 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36274/#review91527
---



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 75 - 
76)
<https://reviews.apache.org/r/36274/#comment144953>

for more accurate, I think the activeNs should go before totalNs.


- Yan Fang


On July 7, 2015, 7:08 p.m., Luis De Pombo wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36274/
> ---
> 
> (Updated July 7, 2015, 7:08 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-401: getCpuTime to truly calculate duty cycle of the event loop
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> c292ae47cd89ef0f25dc682c02dd288e2ba6dcc5 
>   samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala 
> 1643070dd710efb9ade9eb5812dabd6fa60ce023 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 2feb65b729b45fbc3b83a75c4072527e3c4e60be 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> 64a5844bdb343a3c509cba059b9f3b9a19dc9eff 
> 
> Diff: https://reviews.apache.org/r/36274/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Luis De Pombo
> 
>



Re: Thoughts and obesrvations on Samza

2015-07-13 Thread Yan Fang
can
> > >> borrow/copy &  paste significant chunks of code from Samza's code
> base.
> > >> 5. [code] The subproject would intentionally eliminate support for
> both
> > >> other streaming systems and all deployment systems.
> > >> 6. [code] Attempt to provide a bridge from our SystemConsumer to
> KIP-26
> > >> (copy cat)
> > >> 7. [code] Attempt to provide a bridge from the new subproject's
> > processor
> > >> interface to our legacy StreamTask interface.
> > >> 8. [code/community] Sunset Samza as a TLP when we have a working Kafka
> > >> subproject that has a fault-tolerant container with state management.
> > >>
> > >> It's likely that (6) and (7) won't be fully drop-in. Still, the closer
> > we
> > >> can get, the better it's going to be for our existing community.
> > >>
> > >> One thing that I didn't touch on with (2) is whether any Samza PMC
> > members
> > >> should be rolled into Kafka PMC membership as well (though, Jay and
> > Jakob
> > >> are already PMC members on both). I think that Samza's community
> > deserves
> > >> a
> > >> voice on the PMC, so I'd propose that we roll at least a few PMC
> members
> > >> into the Kafka PMC, but I don't have a strong framework for which
> people
> > >> to
> > >> pick.
> > >>
> > >> Before (8), I think that Samza's TLP can continue to commit bug fixes
> > and
> > >> patches as it sees fit, provided that we openly communicate that we
> > won't
> > >> necessarily migrate new features to the new subproject, and that the
> TLP
> > >> will be shut down after the migration to the Kafka subproject occurs.
> > >>
> > >> Jakob, I could use your guidance here about about how to achieve this
> > from
> > >> an Apache process perspective (sorry).
> > >>
> > >> * Should I just call a vote on this proposal?
> > >> * Should it happen on dev or private?
> > >> * Do committers have binding votes, or just PMC?
> > >>
> > >> Having trouble finding much detail on the Apache wikis. :(
> > >>
> > >> Cheers,
> > >> Chris
> > >>
> > >> On Fri, Jul 10, 2015 at 2:38 PM, Yan Fang 
> wrote:
> > >>
> > >> > Thanks, Jay. This argument persuaded me actually. :)
> > >> >
> > >> > Fang, Yan
> > >> > yanfang...@gmail.com
> > >> >
> > >> > On Fri, Jul 10, 2015 at 2:33 PM, Jay Kreps 
> wrote:
> > >> >
> > >> > > Hey Yan,
> > >> > >
> > >> > > Yeah philosophically I think the argument is that you should
> capture
> > >> the
> > >> > > stream in Kafka independent of the transformation. This is
> > obviously a
> > >> > > Kafka-centric view point.
> > >> > >
> > >> > > Advantages of this:
> > >> > > - In practice I think this is what e.g. Storm people often end up
> > >> doing
> > >> > > anyway. You usually need to throttle any access to a live serving
> > >> > database.
> > >> > > - Can have multiple subscribers and they get the same thing
> without
> > >> > > additional load on the source system.
> > >> > > - Applications can tap into the stream if need be by subscribing.
> > >> > > - You can debug your transformation by tailing the Kafka topic
> with
> > >> the
> > >> > > console consumer
> > >> > > - Can tee off the same data stream for batch analysis or Lambda
> arch
> > >> > style
> > >> > > re-processing
> > >> > >
> > >> > > The disadvantage is that it will use Kafka resources. But the idea
> > is
> > >> > > eventually you will have multiple subscribers to any data source
> (at
> > >> > least
> > >> > > for monitoring) so you will end up there soon enough anyway.
> > >> > >
> > >> > > Down the road the technical benefit is that I think it gives us a
> > good
> > >> > path
> > >> > > towards end-to-end exactly once semantics from source to
> > destination.
> > >> > > Basically the connectors need to 

Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Yan Fang
Thanks, Jay. This argument persuaded me actually. :)

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 10, 2015 at 2:33 PM, Jay Kreps  wrote:

> Hey Yan,
>
> Yeah philosophically I think the argument is that you should capture the
> stream in Kafka independent of the transformation. This is obviously a
> Kafka-centric view point.
>
> Advantages of this:
> - In practice I think this is what e.g. Storm people often end up doing
> anyway. You usually need to throttle any access to a live serving database.
> - Can have multiple subscribers and they get the same thing without
> additional load on the source system.
> - Applications can tap into the stream if need be by subscribing.
> - You can debug your transformation by tailing the Kafka topic with the
> console consumer
> - Can tee off the same data stream for batch analysis or Lambda arch style
> re-processing
>
> The disadvantage is that it will use Kafka resources. But the idea is
> eventually you will have multiple subscribers to any data source (at least
> for monitoring) so you will end up there soon enough anyway.
>
> Down the road the technical benefit is that I think it gives us a good path
> towards end-to-end exactly once semantics from source to destination.
> Basically the connectors need to support idempotence when talking to Kafka
> and we need the transactional write feature in Kafka to make the
> transformation atomic. This is actually pretty doable if you separate
> connector=>kafka problem from the generic transformations which are always
> kafka=>kafka. However I think it is quite impossible to do in a all_things
> => all_things environment. Today you can say "well the semantics of the
> Samza APIs depend on the connectors you use" but it is actually worse then
> that because the semantics actually depend on the pairing of connectors--so
> not only can you probably not get a usable "exactly once" guarantee
> end-to-end it can actually be quite hard to reverse engineer what property
> (if any) your end-to-end flow has if you have heterogenous systems.
>
> -Jay
>
> On Fri, Jul 10, 2015 at 2:00 PM, Yan Fang  wrote:
>
> > {quote}
> > maintained in a separate repository and retaining the existing
> > committership but sharing as much else as possible (website, etc)
> > {quote}
> >
> > Overall, I agree on this idea. Now the question is more about "how to do
> > it".
> >
> > On the other hand, one thing I want to point out is that, if we decide to
> > go this way, how do we want to support
> > otherSystem-transformation-otherSystem use case?
> >
> > Basically, there are four user groups here:
> >
> > 1. Kafka-transformation-Kafka
> > 2. Kafka-transformation-otherSystem
> > 3. otherSystem-transformation-Kafka
> > 4. otherSystem-transformation-otherSystem
> >
> > For group 1, they can easily use the new Samza library to achieve. For
> > group 2 and 3, they can use copyCat -> transformation -> Kafka or Kafka->
> > transformation -> copyCat.
> >
> > The problem is for group 4. Do we want to abandon this or still support
> it?
> > Of course, this use case can be achieved by using copyCat ->
> transformation
> > -> Kafka -> transformation -> copyCat, the thing is how we persuade them
> to
> > do this long chain. If yes, it will also be a win for Kafka too. Or if
> > there is no one in this community actually doing this so far, maybe ok to
> > not support the group 4 directly.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Fri, Jul 10, 2015 at 12:58 PM, Jay Kreps  wrote:
> >
> > > Yeah I agree with this summary. I think there are kind of two questions
> > > here:
> > > 1. Technically does alignment/reliance on Kafka make sense
> > > 2. Branding wise (naming, website, concepts, etc) does alignment with
> > Kafka
> > > make sense
> > >
> > > Personally I do think both of these things would be really valuable,
> and
> > > would dramatically alter the trajectory of the project.
> > >
> > > My preference would be to see if people can mostly agree on a direction
> > > rather than splintering things off. From my point of view the ideal
> > outcome
> > > of all the options discussed would be to make Samza a closely aligned
> > > subproject, maintained in a separate repository and retaining the
> > existing
> > > committership but sharing as much else as possible (website, etc). No
> > idea
> > > about how these things work, Jacob, you probably know more.
> > >
>

Re: Question on newBlockingQueue in BlockingEnvelopeMap

2015-07-10 Thread Yan Fang
Hi Jae,

I think the messages are not "lost", instead, they all go to one partition,
in your "shared queue" implementation.

If you check the code in BlockingEnvelopeMap line 123

,
it puts all the messages in the queue in one partition.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon  wrote:

> Hi Samza devs and users
>
> I wrote customized Samza S3 consumer which downloads files from S3 and put
> messages in BlockedEnvelopeMap. It was straightforward because there's a
> nice example, filereader. I tried to a little optimize with
> newBlockingQueue() method because I guess that single queue shared could be
> fine because Samza container is single threaded. I added the following
> code:
>
>
> public S3Consumer(String systemName, Config config, MetricsRegistry
> registry) {
> queueSize = config.getInt("systems." + systemName + ".queue.size",
> 1);
> bucket = config.get("systems." + systemName + ".bucket");
> prefix = config.get("systems." + systemName + ".prefix");
>
> queue = new LinkedBlockingQueue<>(queueSize);
>
> recordCounter = registry.newCounter(this.getClass().getName(),
> "processed_records");
> }
>
> @Override
> protected BlockingQueue newBlockingQueue() {
> return queue; // single queue
> }
>
> Unfortunately, I observed significant message loss with this
> implementation. I suspected its queue might have dropped messages, so I
> changed newBlockingQueue() implementation same as filereader.
>
> @Override
> protected BlockingQueue newBlockingQueue() {
> return new LinkedBlockingQueue<>(queueSize);
> }
>
> Then, message loss didn't happen again.
>
> Do you have any idea why it went wrong?
>
> Thank you
> Best, Jae
>


Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Yan Fang
t; > > > > >> > do
> > >> > > > > >> > > it since we felt it would be limiting. From my point of
> > view
> > >> > the
> > >> > > > > three
> > >> > > > > >> > > things have changed (1) Kafka is now really heavily
> used
> > for
> > >> > > > stream
> > >> > > > > >> > > processing, (2) we learned that abstracting out the
> > stream
> > >> > well
> > >> > > is
> > >> > > > > >> > > basically impossible, (3) we learned it is really hard
> to
> > >> keep
> > >> > > the
> > >> > > > > two
> > >> > > > > >> > > things feeling like a single product.
> > >> > > > > >> > >
> > >> > > > > >> > > -Jay
> > >> > > > > >> > >
> > >> > > > > >> > >
> > >> > > > > >> > > On Mon, Jul 6, 2015 at 3:37 AM, Martin Kleppmann <
> > >> > > > > >> mar...@kleppmann.com>
> > >> > > > > >> > > wrote:
> > >> > > > > >> > >
> > >> > > > > >> > >> Hi all,
> > >> > > > > >> > >>
> > >> > > > > >> > >> Lots of good thoughts here.
> > >> > > > > >> > >>
> > >> > > > > >> > >> I agree with the general philosophy of tying Samza
> more
> > >> > firmly
> > >> > > to
> > >> > > > > >> Kafka.
> > >> > > > > >> > >> After I spent a while looking at integrating other
> > message
> > >> > > > brokers
> > >> > > > > >> (e.g.
> > >> > > > > >> > >> Kinesis) with SystemConsumer, I came to the conclusion
> > that
> > >> > > > > >> > SystemConsumer
> > >> > > > > >> > >> tacitly assumes a model so much like Kafka's that
> pretty
> > >> much
> > >> > > > > nobody
> > >> > > > > >> but
> > >> > > > > >> > >> Kafka actually implements it. (Databus is perhaps an
> > >> > exception,
> > >> > > > but
> > >> > > > > >> it
> > >> > > > > >> > >> isn't widely used outside of LinkedIn.) Thus, making
> > Samza
> > >> > > fully
> > >> > > > > >> > dependent
> > >> > > > > >> > >> on Kafka acknowledges that the system-independence was
> > >> never
> > >> > as
> > >> > > > > real
> > >> > > > > >> as
> > >> > > > > >> > we
> > >> > > > > >> > >> perhaps made it out to be. The gains of code reuse are
> > >> real.
> > >> > > > > >> > >>
> > >> > > > > >> > >> The idea of decoupling Samza from YARN has also always
> > been
> > >> > > > > >> appealing to
> > >> > > > > >> > >> me, for various reasons already mentioned in this
> > thread.
> > >> > > > Although
> > >> > > > > >> > making
> > >> > > > > >> > >> Samza jobs deployable on anything (YARN/Mesos/AWS/etc)
> > >> seems
> > >> > > > > >> laudable,
> > >> > > > > >> > I am
> > >> > > > > >> > >> a little concerned that it will restrict us to a
> lowest
> > >> > common
> > >> > > > > >> > denominator.
> > >> > > > > >> > >> For example, would host affinity (SAMZA-617) still be
> > >> > possible?
> > >> > > > For
> > >> > > > > >> jobs
> > >> > > > > >> > >> with large amounts of state, I think SAMZA-617 would
> be
> > a
> &

Re: Review Request 35933: SAMZA-449 Expose RocksDB statistic

2015-07-03 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35933/#review90373
---



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 133)
<https://reviews.apache.org/r/35933/#comment143410>

we can remove the "KeyValueStoreMetrics" here, right? change to "new 
RocksDbStatisticMetrics(storeName, options, metrics)"



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 245)
<https://reviews.apache.org/r/35933/#comment143411>

the "[[]]" is not very common. And I do not see anywere in Samza we are 
using. It's better to remove them, or use {@link} if you want to link to the 
class.



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatisticMetrics.scala
 (line 213)
<https://reviews.apache.org/r/35933/#comment143409>

is any code is using those methods (line 213-219)? If not, it's safe to 
remove them. Also, users can directly call the variable name to get the value, 
if they want.


- Yan Fang


On July 3, 2015, 11:05 p.m., Gustavo Anatoly F. V. Solís wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35933/
> ---
> 
> (Updated July 3, 2015, 11:05 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> RocksDb expose statistic
> 
> 
> Diffs
> -
> 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  a423f7bd6c43461e051b5fd1f880dd01db785991 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatisticMetrics.scala
>  PRE-CREATION 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  a428a16bc1e9ab4980a6f17db4fd810057d31136 
> 
> Diff: https://reviews.apache.org/r/35933/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gustavo Anatoly F. V. Solís
> 
>



Re: Thoughts and obesrvations on Samza

2015-07-01 Thread Yan Fang
Overall, I agree to couple with Kafka more tightly. Because Samza de facto
is based on Kafka, and it should leverage what Kafka has. At the same time,
Kafka does not need to reinvent what Samza already has. I also like the
idea of separating the ingestion and transformation.

But it is a little difficult for me to image how the Samza will look like.
And I feel Chris and Jay have a little difference in terms of how Samza
should look like.

*** Will it look like what Jay's code shows (A client of Kakfa) ? And
user's application code calls this client?

1. If we make Samza be a library of Kafka (like what the code shows), how
do we implement auto-balance and fault-tolerance? Are they taken care by
the Kafka broker or other mechanism, such as "Samza worker" (just make up
the name) ?

2. What about other features, such as auto-scaling, shared state,
monitoring?


*** If we have Samza standalone, (is this what Chris suggests?)

1. we still need to ingest data from Kakfa and produce to it. Then it
becomes the same as what Samza looks like now, except it does not rely on
Yarn anymore.

2. if it is standalone, how can it leverage Kafka's metrics, logs, etc? Use
Kafka code as the dependency?


Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang  wrote:

> Read through the code example and it looks good to me. A few thoughts
> regarding deployment:
>
> Today Samza deploys as executable runnable like:
>
> deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://...
>
> And this proposal advocate for deploying Samza more as embedded libraries
> in user application code (ignoring the terminology since it is not the same
> as the prototype code):
>
> StreamTask task = new MyStreamTask(configs);
> Thread thread = new Thread(task);
> thread.start();
>
> I think both of these deployment modes are important for different types of
> users. That said, I think making Samza purely standalone is still
> sufficient for either runnable or library modes.
>
> Guozhang
>
> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps  wrote:
>
> > Looks like gmail mangled the code example, it was supposed to look like
> > this:
> >
> > Properties props = new Properties();
> > props.put("bootstrap.servers", "localhost:4242");
> > StreamingConfig config = new StreamingConfig(props);
> > config.subscribe("test-topic-1", "test-topic-2");
> > config.processor(ExampleStreamProcessor.class);
> > config.serialization(new StringSerializer(), new StringDeserializer());
> > KafkaStreaming container = new KafkaStreaming(config);
> > container.run();
> >
> > -Jay
> >
> > On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps  wrote:
> >
> > > Hey guys,
> > >
> > > This came out of some conversations Chris and I were having around
> > whether
> > > it would make sense to use Samza as a kind of data ingestion framework
> > for
> > > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
> combined
> > > with complaints around config and YARN and the discussion around how to
> > > best do a standalone mode.
> > >
> > > So the thought experiment was, given that Samza was basically already
> > > totally Kafka specific, what if you just embraced that and turned it
> into
> > > something less like a heavyweight framework and more like a third Kafka
> > > client--a kind of "producing consumer" with state management
> facilities.
> > > Basically a library. Instead of a complex stream processing framework
> > this
> > > would actually be a very simple thing, not much more complicated to use
> > or
> > > operate than a Kafka consumer. As Chris said we thought about it a lot
> of
> > > what Samza (and the other stream processing systems were doing) seemed
> > like
> > > kind of a hangover from MapReduce.
> > >
> > > Of course you need to ingest/output data to and from the stream
> > > processing. But when we actually looked into how that would work, Samza
> > > isn't really an ideal data ingestion framework for a bunch of reasons.
> To
> > > really do that right you need a pretty different internal data model
> and
> > > set of apis. So what if you split them and had an api for Kafka
> > > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > > transformation (Samza).
> > >
> > > This would also allow really embracing the same terminology and
> > > conventions. One complaint about the current state is that the two
> > systems
> > > kind of feel bolted on. Terminology like "stream" vs "topic" and
> > different
> > > config and monitoring systems means you kind of have to learn Kafka's
> > way,
> > > then learn Samza's slightly different way, then kind of understand how
> > they
> > > map to each other, which having walked a few people through this is
> > > surprisingly tricky for folks to get.
> > >
> > > Since I have been spending a lot of time on airplanes I hacked up an
> > > ernest but still somewhat incomplete prototype of what this would look
> > > like. This is just unceremoniously dumped into Kafka as it requi

  1   2   3   >