Review Request 37428: SAMZA-723: hello-samza hangs when using StreamAppender

2015-08-12 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37428/
---

Review request for samza.


Bugs: SAMZA-723
https://issues.apache.org/jira/browse/SAMZA-723


Repository: samza


Description
---

fixed 2 parts of the problem:
1. Start streamAppender until the JobCoordinator is running
2. deadlock in Producer thread and the main thread

More explaination is in JIRA.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
a926ce6 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
209296d 
  samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
8948453 
  
samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 3e81240 

Diff: https://reviews.apache.org/r/37428/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 37102: SAMZA-753: BrokerProxy stop should shutdown kafka consumer first

2015-08-12 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37102/
---

(Updated Aug. 13, 2015, 1:12 a.m.)


Review request for samza.


Changes
---

added unit test for the closing.


Bugs: SAMZA-753
https://issues.apache.org/jira/browse/SAMZA-753


Repository: samza


Description
---

shutdown the kafka consumer before interrupting the BrokerProxy


Diffs (updated)
-

  samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
614f33f 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
e285dec 

Diff: https://reviews.apache.org/r/37102/diff/


Testing
---


Thanks,

Yan Fang



RE: Missing a change log offset for SystemStreamPartition

2015-08-12 Thread Jordi Blasi Uribarri
Thanks. You got the idea. At the moment I am going to start with the init 
method. The change-capture system was what I wanted to develop (but with a 
parallel job, no way there). The origin of the data should be a (soon to be 
developed) web admin. I was thinking on add and delete commands to alter de 
data, but I guess that if replication is an issue, I better wait, as It can be 
tricky (if not impossible) to manage that updates are distributed to all 
containers.

I guess that my first problem is about semantics of Samza (yes, containers are 
the copies :)). 

Thanks for your help.

Jordi

-Mensaje original-
De: Yi Pan [mailto:nickpa...@gmail.com] 
Enviado el: miƩrcoles, 12 de agosto de 2015 1:50
Para: dev@samza.apache.org
Asunto: Re: Missing a change log offset for SystemStreamPartition

Hi, Jordi,

I see your use case now. Essentially, you want to have an adjunct data 
bootstrap into a Samza job which will consume a stream and do a stream-table 
join w/ the pre-loaded adjunct data in the store. We have plenty of these kind 
of use case in LinkedIn. If your adjunct data set is small and static, you can 
simple load it in the init() method from external data sources. If your adjunct 
data set is big and may have updates as well, one popular set up in LinkedIn is:
1) Have a change-capture system associated w/ the external source which also 
being able to scan the whole data source to bootstrap
2) The change-capture system will write each record / record update into a 
Kafka system stream (i.e. change-capture-topic) to be consumed by the 
downstream Samza job
3) The downstream Samza job can be configured to bootstrap on the 
change-capture-topic and consume from input topic topicA. The Samza job will 
then simply bootstrap via consuming all messages in the change-capture-topic 
and updating the local KV-store, before starting consuming input topicA.
The change-capture system in LinkedIn is called Databus, which scans MySQL 
binlogs and sends the transactions into Kafka. Martin has wrote up some 
PostgreSQL change-capture work here:
https://issues.apache.org/jira/browse/SAMZA-212. What's your external source?

Does the above sounds a solution that you are looking for?

As for your last question about replicate the store to multiple containers (I 
assume that you meant container when stating "all copies of the job"), there is 
an on-going work on broadcast stream here:
https://issues.apache.org/jira/browse/SAMZA-676.

-Yi

On Tue, Aug 11, 2015 at 1:11 PM, Jordi Blasi Uribarri 
wrote:

> Hi,
>
> What I am trying to develop is (I think) an evolution of the 
> Stream-table join. For every message processed, depending on the value 
> a variable contained on it go to the store and decide to which topic it must 
> be sent.
> It is some kind of workflow manager (very simple). I can read from a 
> store the data to guide the decision: key-> previous step, value-> output 
> topic.
>
> My problem is how to make that this information is already available 
> for the job when the process() method is called. I was trying to load 
> this info on a (let's call it) configuration job that receives all the 
> data pairs and loads to the store. As I see this is not supported so I 
> need another way to get this info into the store.
>
> I see the same problem with the Zip code example in the documentation: 
> how do the zip codes get to the store so they can be crossed against 
> the incoming messages?
>
> I am thinking of a solution that could be on the initialization 
> process read it from an external source, maybe a MySQL server and load 
> it to the store. Then on the process of the messages it could access 
> the data. Is this a correct way of doing it?
>
> I am not sure if I am explaining correctly what I am trying to do.
>
> The other question I have is, once the data is loaded to the store, is 
> it replicated to all the copies of the job?
>
> thanks,
>
> Jordi
>
> 
> De: Yi Pan [nickpa...@gmail.com]
> Enviado: martes, 11 de agosto de 2015 19:03
> Para: dev@samza.apache.org
> Asunto: Re: Missing a change log offset for SystemStreamPartition
>
> Hi, Jordi,
>
> The local KV-store is meant to be accessed by the Samza container 
> locally on the machine. If you were referring to the use case that the 
> local KV-store is accessed by a container from a different Samza job, 
> it is not supported. And what does it exactly mean when you say "load 
> the DB to be able to use it from the consuming job"? We may be of more 
> help if we know your use case in more details.
>
> Thanks!
>
> -Yi
>
>
>
> On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri 
> 
> wrote:
>
> > InitableTask tas kwas missing. That responds to another problem that 
> > I
> was
> > experiencing (and left for later). Anyway the exception was still 
> > there until I commented the changelog definition line in the properties 
> > file:
> >
> > #stores.test12db.changelog=kafka.test12db-changelog
> >
>