Re: Thoughts and obesrvations on Samza

2015-07-01 Thread Yan Fang
Overall, I agree to couple with Kafka more tightly. Because Samza de facto
is based on Kafka, and it should leverage what Kafka has. At the same time,
Kafka does not need to reinvent what Samza already has. I also like the
idea of separating the ingestion and transformation.

But it is a little difficult for me to image how the Samza will look like.
And I feel Chris and Jay have a little difference in terms of how Samza
should look like.

*** Will it look like what Jay's code shows (A client of Kakfa) ? And
user's application code calls this client?

1. If we make Samza be a library of Kafka (like what the code shows), how
do we implement auto-balance and fault-tolerance? Are they taken care by
the Kafka broker or other mechanism, such as "Samza worker" (just make up
the name) ?

2. What about other features, such as auto-scaling, shared state,
monitoring?


*** If we have Samza standalone, (is this what Chris suggests?)

1. we still need to ingest data from Kakfa and produce to it. Then it
becomes the same as what Samza looks like now, except it does not rely on
Yarn anymore.

2. if it is standalone, how can it leverage Kafka's metrics, logs, etc? Use
Kafka code as the dependency?


Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang  wrote:

> Read through the code example and it looks good to me. A few thoughts
> regarding deployment:
>
> Today Samza deploys as executable runnable like:
>
> deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://...
>
> And this proposal advocate for deploying Samza more as embedded libraries
> in user application code (ignoring the terminology since it is not the same
> as the prototype code):
>
> StreamTask task = new MyStreamTask(configs);
> Thread thread = new Thread(task);
> thread.start();
>
> I think both of these deployment modes are important for different types of
> users. That said, I think making Samza purely standalone is still
> sufficient for either runnable or library modes.
>
> Guozhang
>
> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps  wrote:
>
> > Looks like gmail mangled the code example, it was supposed to look like
> > this:
> >
> > Properties props = new Properties();
> > props.put("bootstrap.servers", "localhost:4242");
> > StreamingConfig config = new StreamingConfig(props);
> > config.subscribe("test-topic-1", "test-topic-2");
> > config.processor(ExampleStreamProcessor.class);
> > config.serialization(new StringSerializer(), new StringDeserializer());
> > KafkaStreaming container = new KafkaStreaming(config);
> > container.run();
> >
> > -Jay
> >
> > On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps  wrote:
> >
> > > Hey guys,
> > >
> > > This came out of some conversations Chris and I were having around
> > whether
> > > it would make sense to use Samza as a kind of data ingestion framework
> > for
> > > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
> combined
> > > with complaints around config and YARN and the discussion around how to
> > > best do a standalone mode.
> > >
> > > So the thought experiment was, given that Samza was basically already
> > > totally Kafka specific, what if you just embraced that and turned it
> into
> > > something less like a heavyweight framework and more like a third Kafka
> > > client--a kind of "producing consumer" with state management
> facilities.
> > > Basically a library. Instead of a complex stream processing framework
> > this
> > > would actually be a very simple thing, not much more complicated to use
> > or
> > > operate than a Kafka consumer. As Chris said we thought about it a lot
> of
> > > what Samza (and the other stream processing systems were doing) seemed
> > like
> > > kind of a hangover from MapReduce.
> > >
> > > Of course you need to ingest/output data to and from the stream
> > > processing. But when we actually looked into how that would work, Samza
> > > isn't really an ideal data ingestion framework for a bunch of reasons.
> To
> > > really do that right you need a pretty different internal data model
> and
> > > set of apis. So what if you split them and had an api for Kafka
> > > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > > transformation (Samza).
> > >
> > > This would also allow really embracing the same terminology and
> > > conventions. One complaint about the current state is that the two
> > systems
> > > kind of feel bolted on. Terminology like "stream" vs "topic" and
> > different
> > > config and monitoring systems means you kind of have to learn Kafka's
> > way,
> > > then learn Samza's slightly different way, then kind of understand how
> > they
> > > map to each other, which having walked a few people through this is
> > > surprisingly tricky for folks to get.
> > >
> > > Since I have been spending a lot of time on airplanes I hacked up an
> > > ernest but still somewhat incomplete prototype of what this would look
> > > like. This is just unceremoniously dumped into Kafka as it requi

Re: [VOTE] Apache Samza 0.9.1 RC1

2015-07-01 Thread Jakob Homan
+1 (binding).  Artifacts are good, tests pass, code looks good.  Found
minor issue with test-patch (SAMZA-726), but doesn't affect release.

It's pretty standard to run weekend- or holiday-strattling votes for five days.

-Jakob


On 1 July 2015 at 20:49, Milinda Pathirage  wrote:
> +1 for extending the voting period.
>
> Thanks
> Milinda
>
> On Wed, Jul 1, 2015 at 8:27 PM, Yi Pan  wrote:
>
>> Hi, team,
>>
>> 72 hours have passed. So far, we got 2 binding votes and 2 non-binding
>> votes, all +1. I am proposing to extend 1 more day to 12:00pm 7/2/2015
>> Thursday.
>>
>> Does it sound good to everyone?
>>
>> Thanks!
>>
>> -Yi
>>
>> On Wed, Jul 1, 2015 at 12:15 PM, Navina Ramesh
>> > > wrote:
>>
>> > +1
>> >
>> > Verified MD5 & asc signatures. Local build & check-all.sh passed.
>> >
>> > Thanks!
>> > Navina
>> >
>> > On 6/30/15, 7:13 PM, "Chris Riccomini"  wrote:
>> >
>> > >+1
>> > >
>> > >Verified MD5, and asc signature. Build locally, and all tests pass.
>> > >
>> > >Cheers,
>> > >Chris
>> > >
>> > >On Tue, Jun 30, 2015 at 1:26 PM, Milinda Pathirage <
>> mpath...@umail.iu.edu
>> > >
>> > >wrote:
>> > >
>> > >> +1 (non-binding)
>> > >>
>> > >> Verified signature. Tested locally using ./bin/check-all.sh.
>> > >>
>> > >> Thanks
>> > >> Milinda
>> > >>
>> > >> On Tue, Jun 30, 2015 at 2:10 AM, Yan Fang 
>> wrote:
>> > >>
>> > >> > +1
>> > >> >
>> > >> > Verified MD5, Signature.
>> > >> >
>> > >> > Tested locally.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > Fang, Yan
>> > >> > yanfang...@gmail.com
>> > >> >
>> > >> > On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan 
>> wrote:
>> > >> >
>> > >> > > Hey all,
>> > >> > >
>> > >> > > This is a call for a vote on a release of Apache Samza 0.9.1. This
>> > >>is a
>> > >> > > bug-fix release against 0.9.0.
>> > >> > >
>> > >> > > The release candidate can be downloaded from here:
>> > >> > >
>> > >> > > http://people.apache.org/~nickpan47/samza-0.9.1-rc1/
>> > >> > >
>> > >> > > The release candidate is signed with pgp key 911402D8, which is
>> > >> > > included in the repository's KEYS file:
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> >
>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;h
>> > >>b=6f5bafb6cd93934781161eb6b1868d11ea347c95
>> > >> > >
>> > >> > > and can also be found on keyservers:
>> > >> > >
>> > >> > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
>> > >> > >
>> > >> > > The git tag is release-0.9.1-rc1 and signed with the same pgp key:
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> >
>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650
>> > >>538b4bb68b338eb472b98a5709e
>> > >> > >
>> > >> > > Test binaries have been published to Maven's staging repository,
>> and
>> > >> are
>> > >> > > available here:
>> > >> > >
>> > >>
>> https://repository.apache.org/content/repositories/orgapachesamza-1007/
>> > >> > >
>> > >> > > Note release 0.9.1 is still supporting JDK6 and the binaries were
>> > >>built
>> > >> > > with JDK6 without incident.
>> > >> > >
>> > >> > > 6 critical bugs were resolved for this release:
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> >
>> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA
>> >
>> >>%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20
>> > >>Closed%29
>> > >> > >
>> > >> > > The vote will be open for 72 hours ( end in 12:00pm Wed,
>> 07/01/2015
>> > >>).
>> > >> > > Please download the release candidate, check the hashes/signature,
>> > >> build
>> > >> > it
>> > >> > > and test it, and then please vote:
>> > >> > >
>> > >> > > [ ] +1 approve
>> > >> > > [ ] +0 no opinion
>> > >> > > [ ] -1 disapprove (and reason why)
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> Milinda Pathirage
>> > >>
>> > >> PhD Student | Research Assistant
>> > >> School of Informatics and Computing | Data to Insight Center
>> > >> Indiana University
>> > >>
>> > >> twitter: milindalakmal
>> > >> skype: milinda.pathirage
>> > >> blog: http://milinda.pathirage.org
>> > >>
>> >
>> >
>>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org


Re: [VOTE] Apache Samza 0.9.1 RC1

2015-07-01 Thread Jakob Homan
s/test-patch/check-all/

Guess which project I've been working on?

On 1 July 2015 at 21:22, Jakob Homan  wrote:
> +1 (binding).  Artifacts are good, tests pass, code looks good.  Found
> minor issue with test-patch (SAMZA-726), but doesn't affect release.
>
> It's pretty standard to run weekend- or holiday-strattling votes for five 
> days.
>
> -Jakob
>
>
> On 1 July 2015 at 20:49, Milinda Pathirage  wrote:
>> +1 for extending the voting period.
>>
>> Thanks
>> Milinda
>>
>> On Wed, Jul 1, 2015 at 8:27 PM, Yi Pan  wrote:
>>
>>> Hi, team,
>>>
>>> 72 hours have passed. So far, we got 2 binding votes and 2 non-binding
>>> votes, all +1. I am proposing to extend 1 more day to 12:00pm 7/2/2015
>>> Thursday.
>>>
>>> Does it sound good to everyone?
>>>
>>> Thanks!
>>>
>>> -Yi
>>>
>>> On Wed, Jul 1, 2015 at 12:15 PM, Navina Ramesh
>>> >> > wrote:
>>>
>>> > +1
>>> >
>>> > Verified MD5 & asc signatures. Local build & check-all.sh passed.
>>> >
>>> > Thanks!
>>> > Navina
>>> >
>>> > On 6/30/15, 7:13 PM, "Chris Riccomini"  wrote:
>>> >
>>> > >+1
>>> > >
>>> > >Verified MD5, and asc signature. Build locally, and all tests pass.
>>> > >
>>> > >Cheers,
>>> > >Chris
>>> > >
>>> > >On Tue, Jun 30, 2015 at 1:26 PM, Milinda Pathirage <
>>> mpath...@umail.iu.edu
>>> > >
>>> > >wrote:
>>> > >
>>> > >> +1 (non-binding)
>>> > >>
>>> > >> Verified signature. Tested locally using ./bin/check-all.sh.
>>> > >>
>>> > >> Thanks
>>> > >> Milinda
>>> > >>
>>> > >> On Tue, Jun 30, 2015 at 2:10 AM, Yan Fang 
>>> wrote:
>>> > >>
>>> > >> > +1
>>> > >> >
>>> > >> > Verified MD5, Signature.
>>> > >> >
>>> > >> > Tested locally.
>>> > >> >
>>> > >> > Thanks,
>>> > >> >
>>> > >> > Fang, Yan
>>> > >> > yanfang...@gmail.com
>>> > >> >
>>> > >> > On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan 
>>> wrote:
>>> > >> >
>>> > >> > > Hey all,
>>> > >> > >
>>> > >> > > This is a call for a vote on a release of Apache Samza 0.9.1. This
>>> > >>is a
>>> > >> > > bug-fix release against 0.9.0.
>>> > >> > >
>>> > >> > > The release candidate can be downloaded from here:
>>> > >> > >
>>> > >> > > http://people.apache.org/~nickpan47/samza-0.9.1-rc1/
>>> > >> > >
>>> > >> > > The release candidate is signed with pgp key 911402D8, which is
>>> > >> > > included in the repository's KEYS file:
>>> > >> > >
>>> > >> > >
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> >
>>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;h
>>> > >>b=6f5bafb6cd93934781161eb6b1868d11ea347c95
>>> > >> > >
>>> > >> > > and can also be found on keyservers:
>>> > >> > >
>>> > >> > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
>>> > >> > >
>>> > >> > > The git tag is release-0.9.1-rc1 and signed with the same pgp key:
>>> > >> > >
>>> > >> > >
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> >
>>> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650
>>> > >>538b4bb68b338eb472b98a5709e
>>> > >> > >
>>> > >> > > Test binaries have been published to Maven's staging repository,
>>> and
>>> > >> are
>>> > >> > > available here:
>>> > >> > >
>>> > >>
>>> https://repository.apache.org/content/repositories/orgapachesamza-1007/
>>> > >> > >
>>> > >> > > Note release 0.9.1 is still supporting JDK6 and the binaries were
>>> > >>built
>>> > >> > > with JDK6 without incident.
>>> > >> > >
>>> > >> > > 6 critical bugs were resolved for this release:
>>> > >> > >
>>> > >> > >
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> >
>>> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA
>>> >
>>> >>%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20
>>> > >>Closed%29
>>> > >> > >
>>> > >> > > The vote will be open for 72 hours ( end in 12:00pm Wed,
>>> 07/01/2015
>>> > >>).
>>> > >> > > Please download the release candidate, check the hashes/signature,
>>> > >> build
>>> > >> > it
>>> > >> > > and test it, and then please vote:
>>> > >> > >
>>> > >> > > [ ] +1 approve
>>> > >> > > [ ] +0 no opinion
>>> > >> > > [ ] -1 disapprove (and reason why)
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >> Milinda Pathirage
>>> > >>
>>> > >> PhD Student | Research Assistant
>>> > >> School of Informatics and Computing | Data to Insight Center
>>> > >> Indiana University
>>> > >>
>>> > >> twitter: milindalakmal
>>> > >> skype: milinda.pathirage
>>> > >> blog: http://milinda.pathirage.org
>>> > >>
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> Milinda Pathirage
>>
>> PhD Student | Research Assistant
>> School of Informatics and Computing | Data to Insight Center
>> Indiana University
>>
>> twitter: milindalakmal
>> skype: milinda.pathirage
>> blog: http://milinda.pathirage.org


Re: [VOTE] Apache Samza 0.9.1 RC1

2015-07-01 Thread Milinda Pathirage
+1 for extending the voting period.

Thanks
Milinda

On Wed, Jul 1, 2015 at 8:27 PM, Yi Pan  wrote:

> Hi, team,
>
> 72 hours have passed. So far, we got 2 binding votes and 2 non-binding
> votes, all +1. I am proposing to extend 1 more day to 12:00pm 7/2/2015
> Thursday.
>
> Does it sound good to everyone?
>
> Thanks!
>
> -Yi
>
> On Wed, Jul 1, 2015 at 12:15 PM, Navina Ramesh
>  > wrote:
>
> > +1
> >
> > Verified MD5 & asc signatures. Local build & check-all.sh passed.
> >
> > Thanks!
> > Navina
> >
> > On 6/30/15, 7:13 PM, "Chris Riccomini"  wrote:
> >
> > >+1
> > >
> > >Verified MD5, and asc signature. Build locally, and all tests pass.
> > >
> > >Cheers,
> > >Chris
> > >
> > >On Tue, Jun 30, 2015 at 1:26 PM, Milinda Pathirage <
> mpath...@umail.iu.edu
> > >
> > >wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> Verified signature. Tested locally using ./bin/check-all.sh.
> > >>
> > >> Thanks
> > >> Milinda
> > >>
> > >> On Tue, Jun 30, 2015 at 2:10 AM, Yan Fang 
> wrote:
> > >>
> > >> > +1
> > >> >
> > >> > Verified MD5, Signature.
> > >> >
> > >> > Tested locally.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Fang, Yan
> > >> > yanfang...@gmail.com
> > >> >
> > >> > On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan 
> wrote:
> > >> >
> > >> > > Hey all,
> > >> > >
> > >> > > This is a call for a vote on a release of Apache Samza 0.9.1. This
> > >>is a
> > >> > > bug-fix release against 0.9.0.
> > >> > >
> > >> > > The release candidate can be downloaded from here:
> > >> > >
> > >> > > http://people.apache.org/~nickpan47/samza-0.9.1-rc1/
> > >> > >
> > >> > > The release candidate is signed with pgp key 911402D8, which is
> > >> > > included in the repository's KEYS file:
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;h
> > >>b=6f5bafb6cd93934781161eb6b1868d11ea347c95
> > >> > >
> > >> > > and can also be found on keyservers:
> > >> > >
> > >> > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
> > >> > >
> > >> > > The git tag is release-0.9.1-rc1 and signed with the same pgp key:
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650
> > >>538b4bb68b338eb472b98a5709e
> > >> > >
> > >> > > Test binaries have been published to Maven's staging repository,
> and
> > >> are
> > >> > > available here:
> > >> > >
> > >>
> https://repository.apache.org/content/repositories/orgapachesamza-1007/
> > >> > >
> > >> > > Note release 0.9.1 is still supporting JDK6 and the binaries were
> > >>built
> > >> > > with JDK6 without incident.
> > >> > >
> > >> > > 6 critical bugs were resolved for this release:
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> >
> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA
> >
> >>%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20
> > >>Closed%29
> > >> > >
> > >> > > The vote will be open for 72 hours ( end in 12:00pm Wed,
> 07/01/2015
> > >>).
> > >> > > Please download the release candidate, check the hashes/signature,
> > >> build
> > >> > it
> > >> > > and test it, and then please vote:
> > >> > >
> > >> > > [ ] +1 approve
> > >> > > [ ] +0 no opinion
> > >> > > [ ] -1 disapprove (and reason why)
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Milinda Pathirage
> > >>
> > >> PhD Student | Research Assistant
> > >> School of Informatics and Computing | Data to Insight Center
> > >> Indiana University
> > >>
> > >> twitter: milindalakmal
> > >> skype: milinda.pathirage
> > >> blog: http://milinda.pathirage.org
> > >>
> >
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org


Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
Yi,

Here is my config file:
http://pastebin.com/Kf3C9E0h

- S


Re: Samza and sliding window

2015-07-01 Thread Yi Pan
Hi, Shekar,

Could you attach the complete config file here? It would be hard just to
debug through snippets from your configure file.

Thanks!

-Yi

On Wed, Jul 1, 2015 at 5:59 PM, Shekar Tippur  wrote:

> Sorry, after re-reading the docs,
>
> https://samza.apache.org/learn/documentation/0.8/jobs/configuration-table.html#stores-key-serde
>
> I have changed it to
>
> # Define serde
>
>
> stores.store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>
> stores.store.key.serde=byte
>
> stores.store.msg.serde=string
>
> stores.store.changelog=kafka.argos-parser
>
> Is this correct? I am still getting the same exception.
>
> - Shekar
>
>
>
> On Wed, Jul 1, 2015 at 5:37 PM, Shekar Tippur  wrote:
>
> > I do have this in init as well...
> >
> > public void init(Config config, TaskContext context) {
> >
> >  store = (KeyValueStore) context.getStore("store");
> >
> > }
> > You are right. These are primitive types but I was trying to address this
> > exception:
> >
> > Exception in thread "main" org.apache.samza.SamzaException: Must define a
> > key serde when using key value storage.
> >
> > I have changed it to
> >
> > stores.store.key.serde=org.apache.samza.serializers.ByteSerdeFactory
> >
> > Regardless, exception is persisting.
> >
> > - Shekar
> >
>


Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
Sorry, after re-reading the docs,
https://samza.apache.org/learn/documentation/0.8/jobs/configuration-table.html#stores-key-serde

I have changed it to

# Define serde

stores.store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory

stores.store.key.serde=byte

stores.store.msg.serde=string

stores.store.changelog=kafka.argos-parser

Is this correct? I am still getting the same exception.

- Shekar



On Wed, Jul 1, 2015 at 5:37 PM, Shekar Tippur  wrote:

> I do have this in init as well...
>
> public void init(Config config, TaskContext context) {
>
>  store = (KeyValueStore) context.getStore("store");
>
> }
> You are right. These are primitive types but I was trying to address this
> exception:
>
> Exception in thread "main" org.apache.samza.SamzaException: Must define a
> key serde when using key value storage.
>
> I have changed it to
>
> stores.store.key.serde=org.apache.samza.serializers.ByteSerdeFactory
>
> Regardless, exception is persisting.
>
> - Shekar
>


Re: Thoughts and obesrvations on Samza

2015-07-01 Thread Guozhang Wang
Read through the code example and it looks good to me. A few thoughts
regarding deployment:

Today Samza deploys as executable runnable like:

deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://...

And this proposal advocate for deploying Samza more as embedded libraries
in user application code (ignoring the terminology since it is not the same
as the prototype code):

StreamTask task = new MyStreamTask(configs);
Thread thread = new Thread(task);
thread.start();

I think both of these deployment modes are important for different types of
users. That said, I think making Samza purely standalone is still
sufficient for either runnable or library modes.

Guozhang

On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps  wrote:

> Looks like gmail mangled the code example, it was supposed to look like
> this:
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:4242");
> StreamingConfig config = new StreamingConfig(props);
> config.subscribe("test-topic-1", "test-topic-2");
> config.processor(ExampleStreamProcessor.class);
> config.serialization(new StringSerializer(), new StringDeserializer());
> KafkaStreaming container = new KafkaStreaming(config);
> container.run();
>
> -Jay
>
> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps  wrote:
>
> > Hey guys,
> >
> > This came out of some conversations Chris and I were having around
> whether
> > it would make sense to use Samza as a kind of data ingestion framework
> for
> > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of combined
> > with complaints around config and YARN and the discussion around how to
> > best do a standalone mode.
> >
> > So the thought experiment was, given that Samza was basically already
> > totally Kafka specific, what if you just embraced that and turned it into
> > something less like a heavyweight framework and more like a third Kafka
> > client--a kind of "producing consumer" with state management facilities.
> > Basically a library. Instead of a complex stream processing framework
> this
> > would actually be a very simple thing, not much more complicated to use
> or
> > operate than a Kafka consumer. As Chris said we thought about it a lot of
> > what Samza (and the other stream processing systems were doing) seemed
> like
> > kind of a hangover from MapReduce.
> >
> > Of course you need to ingest/output data to and from the stream
> > processing. But when we actually looked into how that would work, Samza
> > isn't really an ideal data ingestion framework for a bunch of reasons. To
> > really do that right you need a pretty different internal data model and
> > set of apis. So what if you split them and had an api for Kafka
> > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > transformation (Samza).
> >
> > This would also allow really embracing the same terminology and
> > conventions. One complaint about the current state is that the two
> systems
> > kind of feel bolted on. Terminology like "stream" vs "topic" and
> different
> > config and monitoring systems means you kind of have to learn Kafka's
> way,
> > then learn Samza's slightly different way, then kind of understand how
> they
> > map to each other, which having walked a few people through this is
> > surprisingly tricky for folks to get.
> >
> > Since I have been spending a lot of time on airplanes I hacked up an
> > ernest but still somewhat incomplete prototype of what this would look
> > like. This is just unceremoniously dumped into Kafka as it required a few
> > changes to the new consumer. Here is the code:
> >
> >
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming
> >
> > For the purpose of the prototype I just liberally renamed everything to
> > try to align it with Kafka with no regard for compatibility.
> >
> > To use this would be something like this:
> > Properties props = new Properties(); props.put("bootstrap.servers",
> > "localhost:4242"); StreamingConfig config = new StreamingConfig(props);
> config.subscribe("test-topic-1",
> > "test-topic-2"); config.processor(ExampleStreamProcessor.class);
> config.serialization(new
> > StringSerializer(), new StringDeserializer()); KafkaStreaming container =
> > new KafkaStreaming(config); container.run();
> >
> > KafkaStreaming is basically the SamzaContainer; StreamProcessor is
> > basically StreamTask.
> >
> > So rather than putting all the class names in a file and then having the
> > job assembled by reflection, you just instantiate the container
> > programmatically. Work is balanced over however many instances of this
> are
> > alive at any time (i.e. if an instance dies, new tasks are added to the
> > existing containers without shutting them down).
> >
> > We would provide some glue for running this stuff in YARN via Slider,
> > Mesos via Marathon, and AWS using some of their tools but from the point
> of
> > view of these frameworks these stream processing jobs are just stateless
> 

Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
I do have this in init as well...

public void init(Config config, TaskContext context) {

 store = (KeyValueStore) context.getStore("store");

}
You are right. These are primitive types but I was trying to address this
exception:

Exception in thread "main" org.apache.samza.SamzaException: Must define a
key serde when using key value storage.

I have changed it to

stores.store.key.serde=org.apache.samza.serializers.ByteSerdeFactory

Regardless, exception is persisting.

- Shekar


Re: [VOTE] Apache Samza 0.9.1 RC1

2015-07-01 Thread Yi Pan
Hi, team,

72 hours have passed. So far, we got 2 binding votes and 2 non-binding
votes, all +1. I am proposing to extend 1 more day to 12:00pm 7/2/2015
Thursday.

Does it sound good to everyone?

Thanks!

-Yi

On Wed, Jul 1, 2015 at 12:15 PM, Navina Ramesh  wrote:

> +1
>
> Verified MD5 & asc signatures. Local build & check-all.sh passed.
>
> Thanks!
> Navina
>
> On 6/30/15, 7:13 PM, "Chris Riccomini"  wrote:
>
> >+1
> >
> >Verified MD5, and asc signature. Build locally, and all tests pass.
> >
> >Cheers,
> >Chris
> >
> >On Tue, Jun 30, 2015 at 1:26 PM, Milinda Pathirage  >
> >wrote:
> >
> >> +1 (non-binding)
> >>
> >> Verified signature. Tested locally using ./bin/check-all.sh.
> >>
> >> Thanks
> >> Milinda
> >>
> >> On Tue, Jun 30, 2015 at 2:10 AM, Yan Fang  wrote:
> >>
> >> > +1
> >> >
> >> > Verified MD5, Signature.
> >> >
> >> > Tested locally.
> >> >
> >> > Thanks,
> >> >
> >> > Fang, Yan
> >> > yanfang...@gmail.com
> >> >
> >> > On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan  wrote:
> >> >
> >> > > Hey all,
> >> > >
> >> > > This is a call for a vote on a release of Apache Samza 0.9.1. This
> >>is a
> >> > > bug-fix release against 0.9.0.
> >> > >
> >> > > The release candidate can be downloaded from here:
> >> > >
> >> > > http://people.apache.org/~nickpan47/samza-0.9.1-rc1/
> >> > >
> >> > > The release candidate is signed with pgp key 911402D8, which is
> >> > > included in the repository's KEYS file:
> >> > >
> >> > >
> >> > >
> >> >
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;h
> >>b=6f5bafb6cd93934781161eb6b1868d11ea347c95
> >> > >
> >> > > and can also be found on keyservers:
> >> > >
> >> > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
> >> > >
> >> > > The git tag is release-0.9.1-rc1 and signed with the same pgp key:
> >> > >
> >> > >
> >> > >
> >> >
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650
> >>538b4bb68b338eb472b98a5709e
> >> > >
> >> > > Test binaries have been published to Maven's staging repository, and
> >> are
> >> > > available here:
> >> > >
> >> https://repository.apache.org/content/repositories/orgapachesamza-1007/
> >> > >
> >> > > Note release 0.9.1 is still supporting JDK6 and the binaries were
> >>built
> >> > > with JDK6 without incident.
> >> > >
> >> > > 6 critical bugs were resolved for this release:
> >> > >
> >> > >
> >> > >
> >> >
> >>
> >>
> https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA
> >>%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20
> >>Closed%29
> >> > >
> >> > > The vote will be open for 72 hours ( end in 12:00pm Wed, 07/01/2015
> >>).
> >> > > Please download the release candidate, check the hashes/signature,
> >> build
> >> > it
> >> > > and test it, and then please vote:
> >> > >
> >> > > [ ] +1 approve
> >> > > [ ] +0 no opinion
> >> > > [ ] -1 disapprove (and reason why)
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Milinda Pathirage
> >>
> >> PhD Student | Research Assistant
> >> School of Informatics and Computing | Data to Insight Center
> >> Indiana University
> >>
> >> twitter: milindalakmal
> >> skype: milinda.pathirage
> >> blog: http://milinda.pathirage.org
> >>
>
>


Re: Samza and sliding window

2015-07-01 Thread Yi Pan
Hi, Shekar,

This is probably what's missing:

private KeyValueStore store;

public void init(Config config, TaskContext context) {
...
   store = (KeyValueStore) context.getStore("store");
   ...
}

And, it looks like your key and value data types for the store are
primitive types, why do you want to use JSON serde?

-Yi

On Wed, Jul 1, 2015 at 4:15 PM, Shekar Tippur  wrote:

> Sorry .. That was a copy paste issue
>
> This is what I have
>
> stores.store.key.serde=org.apache.samza.serializers.JsonSerdeFactory
>
> stores.store.msg.serde=org.apache.samza.serializers.JsonSerdeFactory
>
> stores.store.changelog=argos.windowchangelog
>
>
> and this is how i am initializing it
>
> private KeyValueStore store;
>
> On Wed, Jul 1, 2015 at 3:55 PM, Yan Fang  wrote:
>
> > So do you use the "store-name" as the kv storage name in your StreamTask
> > code?
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur  wrote:
> >
> > > Yan,
> > >
> > > yes. I do have it.
> > >
> > > - Shekar
> > >
> > > On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang  wrote:
> > >
> > > > Do you have
> > > >
> > > > serializers.registry.json.class
> > > > =org.apache.samza.serializers.JsonSerdeFactory
> > > >
> > > > in your config file?
> > > >
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > >
> > > > On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur 
> > wrote:
> > > >
> > > > > Yi/Milinda,
> > > > >
> > > > > I am trying to initialize a kv store. I have the following
> properties
> > > > > defined:
> > > > >
> > > > > stores.store-name.key.serde=json
> > > > >
> > > > > stores.store-name.msg.serde=json
> > > > >
> > > > > stores.store-name.changelog=argos.windowchangelog
> > > > > How do I define a key serde as I am getting this exception:
> > > > >
> > > > > Exception in thread "main" org.apache.samza.SamzaException: Must
> > > define a
> > > > > key serde when using key value storage.
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > > >
> > > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > > > >
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > > > >
> > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > > >
> > > > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> > > > >
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> > > > >
> > > > > at scala.collection.SetLike$class.map(SetLike.scala:93)
> > > > >
> > > > > at scala.collection.AbstractSet.map(Set.scala:47)
> > > > >
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
> > > > >
> > > > > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
> > > > >
> > > > > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> > > > >
> > > > > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> > > > >
> > > > > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur  >
> > > > wrote:
> > > > >
> > > > > > Yi,
> > > > > >
> > > > > > My use case is more of the latter. Your explanation makes sense
> > now.
> > > I
> > > > > was
> > > > > > also loo

Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
Sorry .. That was a copy paste issue

This is what I have

stores.store.key.serde=org.apache.samza.serializers.JsonSerdeFactory

stores.store.msg.serde=org.apache.samza.serializers.JsonSerdeFactory

stores.store.changelog=argos.windowchangelog


and this is how i am initializing it

private KeyValueStore store;

On Wed, Jul 1, 2015 at 3:55 PM, Yan Fang  wrote:

> So do you use the "store-name" as the kv storage name in your StreamTask
> code?
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur  wrote:
>
> > Yan,
> >
> > yes. I do have it.
> >
> > - Shekar
> >
> > On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang  wrote:
> >
> > > Do you have
> > >
> > > serializers.registry.json.class
> > > =org.apache.samza.serializers.JsonSerdeFactory
> > >
> > > in your config file?
> > >
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur 
> wrote:
> > >
> > > > Yi/Milinda,
> > > >
> > > > I am trying to initialize a kv store. I have the following properties
> > > > defined:
> > > >
> > > > stores.store-name.key.serde=json
> > > >
> > > > stores.store-name.msg.serde=json
> > > >
> > > > stores.store-name.changelog=argos.windowchangelog
> > > > How do I define a key serde as I am getting this exception:
> > > >
> > > > Exception in thread "main" org.apache.samza.SamzaException: Must
> > define a
> > > > key serde when using key value storage.
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > >
> > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > > >
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > > >
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > >
> > > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> > > >
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> > > >
> > > > at scala.collection.SetLike$class.map(SetLike.scala:93)
> > > >
> > > > at scala.collection.AbstractSet.map(Set.scala:47)
> > > >
> > > > at
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
> > > >
> > > > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
> > > >
> > > > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> > > >
> > > > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> > > >
> > > > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur 
> > > wrote:
> > > >
> > > > > Yi,
> > > > >
> > > > > My use case is more of the latter. Your explanation makes sense
> now.
> > I
> > > > was
> > > > > also looking into Milinda's wiki. She has a section for Kafka
> > > > > partition SimplePartitioner, which is simple enough as well.
> > > > >
> > > > > Thanks for all the inputs. Let me see what I come up with while
> > > > > implementing it.
> > > > >
> > > > > - Shekar
> > > > >
> > > > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan 
> > wrote:
> > > > >
> > > > >> Hi, Shekar,
> > > > >>
> > > > >> First, I would like to clarify what you meant by sliding window:
> is
> > it
> > > > >> defined as windows with size N and advance step size of 1 (which
> > means
> > > > >> that
> > > > >> windows overlap and each input message would contribute to
> multiple
> > > > counts
> > > > >> in different windows)? Or windows with size N and advance step
> size
> > > of N
> > > > >> (i.e

Re: Samza and sliding window

2015-07-01 Thread Yan Fang
So do you use the "store-name" as the kv storage name in your StreamTask
code?

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur  wrote:

> Yan,
>
> yes. I do have it.
>
> - Shekar
>
> On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang  wrote:
>
> > Do you have
> >
> > serializers.registry.json.class
> > =org.apache.samza.serializers.JsonSerdeFactory
> >
> > in your config file?
> >
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur  wrote:
> >
> > > Yi/Milinda,
> > >
> > > I am trying to initialize a kv store. I have the following properties
> > > defined:
> > >
> > > stores.store-name.key.serde=json
> > >
> > > stores.store-name.msg.serde=json
> > >
> > > stores.store-name.changelog=argos.windowchangelog
> > > How do I define a key serde as I am getting this exception:
> > >
> > > Exception in thread "main" org.apache.samza.SamzaException: Must
> define a
> > > key serde when using key value storage.
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
> > >
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >
> > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > >
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
> > >
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >
> > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> > >
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >
> > > at
> > >
> > >
> >
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> > >
> > > at scala.collection.SetLike$class.map(SetLike.scala:93)
> > >
> > > at scala.collection.AbstractSet.map(Set.scala:47)
> > >
> > > at
> > >
> >
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
> > >
> > > at
> > >
> > >
> >
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
> > >
> > > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
> > >
> > > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> > >
> > > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> > >
> > > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur 
> > wrote:
> > >
> > > > Yi,
> > > >
> > > > My use case is more of the latter. Your explanation makes sense now.
> I
> > > was
> > > > also looking into Milinda's wiki. She has a section for Kafka
> > > > partition SimplePartitioner, which is simple enough as well.
> > > >
> > > > Thanks for all the inputs. Let me see what I come up with while
> > > > implementing it.
> > > >
> > > > - Shekar
> > > >
> > > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan 
> wrote:
> > > >
> > > >> Hi, Shekar,
> > > >>
> > > >> First, I would like to clarify what you meant by sliding window: is
> it
> > > >> defined as windows with size N and advance step size of 1 (which
> means
> > > >> that
> > > >> windows overlap and each input message would contribute to multiple
> > > counts
> > > >> in different windows)? Or windows with size N and advance step size
> > of N
> > > >> (i.e. each incoming message only contribute to one counter in a
> single
> > > >> window)?
> > > >>
> > > >> If your use case falls into the first category, you will need
> > something
> > > >> more sophisticated as discussed in SAMZA-552. If your use case is
> the
> > > >> second one, there could be a simpler version of SAMZA-552 that you
> can
> > > go
> > > >> with:
> > > >>
> > > >> 1) Initiate a KV-store that uses the application name as the key
> > > >> 2) For each incoming message, look for the windows that the message
> by
> > > the
> > > >> application name
> > > >> 3) Update the counter and update the value in the KV-store based on
> > the
> > > >> application name
> > > >> 4) Every 5 min when window() method is triggered, set all 

Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
Yan,

yes. I do have it.

- Shekar

On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang  wrote:

> Do you have
>
> serializers.registry.json.class
> =org.apache.samza.serializers.JsonSerdeFactory
>
> in your config file?
>
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur  wrote:
>
> > Yi/Milinda,
> >
> > I am trying to initialize a kv store. I have the following properties
> > defined:
> >
> > stores.store-name.key.serde=json
> >
> > stores.store-name.msg.serde=json
> >
> > stores.store-name.changelog=argos.windowchangelog
> > How do I define a key serde as I am getting this exception:
> >
> > Exception in thread "main" org.apache.samza.SamzaException: Must define a
> > key serde when using key value storage.
> >
> > at
> >
> >
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
> >
> > at
> >
> >
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >
> > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> >
> > at scala.collection.SetLike$class.map(SetLike.scala:93)
> >
> > at scala.collection.AbstractSet.map(Set.scala:47)
> >
> > at
> >
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
> >
> > at
> >
> >
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
> >
> > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
> >
> > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> >
> > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> >
> > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur 
> wrote:
> >
> > > Yi,
> > >
> > > My use case is more of the latter. Your explanation makes sense now. I
> > was
> > > also looking into Milinda's wiki. She has a section for Kafka
> > > partition SimplePartitioner, which is simple enough as well.
> > >
> > > Thanks for all the inputs. Let me see what I come up with while
> > > implementing it.
> > >
> > > - Shekar
> > >
> > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan  wrote:
> > >
> > >> Hi, Shekar,
> > >>
> > >> First, I would like to clarify what you meant by sliding window: is it
> > >> defined as windows with size N and advance step size of 1 (which means
> > >> that
> > >> windows overlap and each input message would contribute to multiple
> > counts
> > >> in different windows)? Or windows with size N and advance step size
> of N
> > >> (i.e. each incoming message only contribute to one counter in a single
> > >> window)?
> > >>
> > >> If your use case falls into the first category, you will need
> something
> > >> more sophisticated as discussed in SAMZA-552. If your use case is the
> > >> second one, there could be a simpler version of SAMZA-552 that you can
> > go
> > >> with:
> > >>
> > >> 1) Initiate a KV-store that uses the application name as the key
> > >> 2) For each incoming message, look for the windows that the message by
> > the
> > >> application name
> > >> 3) Update the counter and update the value in the KV-store based on
> the
> > >> application name
> > >> 4) Every 5 min when window() method is triggered, set all counters to
> > zero
> > >> (this can be done in a lazy way as well, by keeping the last reset
> > >> timestamp in the record in the KV-store, keyed by application name.
> > Then,
> > >> resetting counter to zero can be done when next time the application
> > >> counter is updated again)
> > >>
> > >> Hope that makes sense.
> > >>
> > >> -Yi
> > >>
> > >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur 
> > >> wrote:
> > >>
> > >> > Benjamin,
> > >> >
> > >> > Thanks for the explanation. We dont have any specific partition
> scheme
>

Re: Samza and sliding window

2015-07-01 Thread Yan Fang
Do you have

serializers.registry.json.class
=org.apache.samza.serializers.JsonSerdeFactory

in your config file?


Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur  wrote:

> Yi/Milinda,
>
> I am trying to initialize a kv store. I have the following properties
> defined:
>
> stores.store-name.key.serde=json
>
> stores.store-name.msg.serde=json
>
> stores.store-name.changelog=argos.windowchangelog
> How do I define a key serde as I am getting this exception:
>
> Exception in thread "main" org.apache.samza.SamzaException: Must define a
> key serde when using key value storage.
>
> at
>
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
>
> at
>
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at
>
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>
> at scala.collection.SetLike$class.map(SetLike.scala:93)
>
> at scala.collection.AbstractSet.map(Set.scala:47)
>
> at
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
>
> at
>
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
>
> at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
>
> at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
>
> at org.apache.samza.job.JobRunner.main(JobRunner.scala)
>
> On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur  wrote:
>
> > Yi,
> >
> > My use case is more of the latter. Your explanation makes sense now. I
> was
> > also looking into Milinda's wiki. She has a section for Kafka
> > partition SimplePartitioner, which is simple enough as well.
> >
> > Thanks for all the inputs. Let me see what I come up with while
> > implementing it.
> >
> > - Shekar
> >
> > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan  wrote:
> >
> >> Hi, Shekar,
> >>
> >> First, I would like to clarify what you meant by sliding window: is it
> >> defined as windows with size N and advance step size of 1 (which means
> >> that
> >> windows overlap and each input message would contribute to multiple
> counts
> >> in different windows)? Or windows with size N and advance step size of N
> >> (i.e. each incoming message only contribute to one counter in a single
> >> window)?
> >>
> >> If your use case falls into the first category, you will need something
> >> more sophisticated as discussed in SAMZA-552. If your use case is the
> >> second one, there could be a simpler version of SAMZA-552 that you can
> go
> >> with:
> >>
> >> 1) Initiate a KV-store that uses the application name as the key
> >> 2) For each incoming message, look for the windows that the message by
> the
> >> application name
> >> 3) Update the counter and update the value in the KV-store based on the
> >> application name
> >> 4) Every 5 min when window() method is triggered, set all counters to
> zero
> >> (this can be done in a lazy way as well, by keeping the last reset
> >> timestamp in the record in the KV-store, keyed by application name.
> Then,
> >> resetting counter to zero can be done when next time the application
> >> counter is updated again)
> >>
> >> Hope that makes sense.
> >>
> >> -Yi
> >>
> >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur 
> >> wrote:
> >>
> >> > Benjamin,
> >> >
> >> > Thanks for the explanation. We dont have any specific partition scheme
> >> as
> >> > yet. We just have 2 topics - raw and processed and we use default
> >> > partitioning scheme.
> >> > Can you share any code snippet so I can understand it better?
> >> >
> >> > - Shekar
> >> >
> >>
> >
> >
>


Re: Samza and sliding window

2015-07-01 Thread Shekar Tippur
Yi/Milinda,

I am trying to initialize a kv store. I have the following properties
defined:

stores.store-name.key.serde=json

stores.store-name.msg.serde=json

stores.store-name.changelog=argos.windowchangelog
How do I define a key serde as I am getting this exception:

Exception in thread "main" org.apache.samza.SamzaException: Must define a
key serde when using key value storage.

at
org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)

at
org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)

at
org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)

at
org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)

at
org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)

at scala.collection.SetLike$class.map(SetLike.scala:93)

at scala.collection.AbstractSet.map(Set.scala:47)

at
org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)

at
org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)

at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)

at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)

at org.apache.samza.job.JobRunner.main(JobRunner.scala)

On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur  wrote:

> Yi,
>
> My use case is more of the latter. Your explanation makes sense now. I was
> also looking into Milinda's wiki. She has a section for Kafka
> partition SimplePartitioner, which is simple enough as well.
>
> Thanks for all the inputs. Let me see what I come up with while
> implementing it.
>
> - Shekar
>
> On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan  wrote:
>
>> Hi, Shekar,
>>
>> First, I would like to clarify what you meant by sliding window: is it
>> defined as windows with size N and advance step size of 1 (which means
>> that
>> windows overlap and each input message would contribute to multiple counts
>> in different windows)? Or windows with size N and advance step size of N
>> (i.e. each incoming message only contribute to one counter in a single
>> window)?
>>
>> If your use case falls into the first category, you will need something
>> more sophisticated as discussed in SAMZA-552. If your use case is the
>> second one, there could be a simpler version of SAMZA-552 that you can go
>> with:
>>
>> 1) Initiate a KV-store that uses the application name as the key
>> 2) For each incoming message, look for the windows that the message by the
>> application name
>> 3) Update the counter and update the value in the KV-store based on the
>> application name
>> 4) Every 5 min when window() method is triggered, set all counters to zero
>> (this can be done in a lazy way as well, by keeping the last reset
>> timestamp in the record in the KV-store, keyed by application name. Then,
>> resetting counter to zero can be done when next time the application
>> counter is updated again)
>>
>> Hope that makes sense.
>>
>> -Yi
>>
>> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur 
>> wrote:
>>
>> > Benjamin,
>> >
>> > Thanks for the explanation. We dont have any specific partition scheme
>> as
>> > yet. We just have 2 topics - raw and processed and we use default
>> > partitioning scheme.
>> > Can you share any code snippet so I can understand it better?
>> >
>> > - Shekar
>> >
>>
>
>


Re: [VOTE] Apache Samza 0.9.1 RC1

2015-07-01 Thread Navina Ramesh
+1 

Verified MD5 & asc signatures. Local build & check-all.sh passed.

Thanks!
Navina

On 6/30/15, 7:13 PM, "Chris Riccomini"  wrote:

>+1
>
>Verified MD5, and asc signature. Build locally, and all tests pass.
>
>Cheers,
>Chris
>
>On Tue, Jun 30, 2015 at 1:26 PM, Milinda Pathirage 
>wrote:
>
>> +1 (non-binding)
>>
>> Verified signature. Tested locally using ./bin/check-all.sh.
>>
>> Thanks
>> Milinda
>>
>> On Tue, Jun 30, 2015 at 2:10 AM, Yan Fang  wrote:
>>
>> > +1
>> >
>> > Verified MD5, Signature.
>> >
>> > Tested locally.
>> >
>> > Thanks,
>> >
>> > Fang, Yan
>> > yanfang...@gmail.com
>> >
>> > On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan  wrote:
>> >
>> > > Hey all,
>> > >
>> > > This is a call for a vote on a release of Apache Samza 0.9.1. This
>>is a
>> > > bug-fix release against 0.9.0.
>> > >
>> > > The release candidate can be downloaded from here:
>> > >
>> > > http://people.apache.org/~nickpan47/samza-0.9.1-rc1/
>> > >
>> > > The release candidate is signed with pgp key 911402D8, which is
>> > > included in the repository's KEYS file:
>> > >
>> > >
>> > >
>> >
>> 
>>https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;h
>>b=6f5bafb6cd93934781161eb6b1868d11ea347c95
>> > >
>> > > and can also be found on keyservers:
>> > >
>> > > http://pgp.mit.edu/pks/lookup?op=get&search=0x911402D8
>> > >
>> > > The git tag is release-0.9.1-rc1 and signed with the same pgp key:
>> > >
>> > >
>> > >
>> >
>> 
>>https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650
>>538b4bb68b338eb472b98a5709e
>> > >
>> > > Test binaries have been published to Maven's staging repository, and
>> are
>> > > available here:
>> > >
>> https://repository.apache.org/content/repositories/orgapachesamza-1007/
>> > >
>> > > Note release 0.9.1 is still supporting JDK6 and the binaries were
>>built
>> > > with JDK6 without incident.
>> > >
>> > > 6 critical bugs were resolved for this release:
>> > >
>> > >
>> > >
>> >
>> 
>>https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA
>>%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20
>>Closed%29
>> > >
>> > > The vote will be open for 72 hours ( end in 12:00pm Wed, 07/01/2015
>>).
>> > > Please download the release candidate, check the hashes/signature,
>> build
>> > it
>> > > and test it, and then please vote:
>> > >
>> > > [ ] +1 approve
>> > > [ ] +0 no opinion
>> > > [ ] -1 disapprove (and reason why)
>> > >
>> >
>>
>>
>>
>> --
>> Milinda Pathirage
>>
>> PhD Student | Research Assistant
>> School of Informatics and Computing | Data to Insight Center
>> Indiana University
>>
>> twitter: milindalakmal
>> skype: milinda.pathirage
>> blog: http://milinda.pathirage.org
>>



Re: Best way to log from inside a Samza task?

2015-07-01 Thread Rick Mangi
Hi Jason,

It depends on how you’re deploying. I deploy on yarn via cloudera and my 
logging statements wind up in the container log for each node.

You need to instantiate a logger for your class before writing to it. Look at 
the log4j docs to see the right way to do that.

Rick


> On Jul 1, 2015, at 2:58 PM, ja...@marketingscience.co wrote:
> 
> Rick, 
> 
> From your example, what is the correct way to call the logger in the task? 
> Something like:
> 
> import java.util.Map;
> import org.apache.samza.system.IncomingMessageEnvelope;
> import org.apache.samza.system.OutgoingMessageEnvelope;
> import org.apache.samza.system.SystemStream;
> import org.apache.samza.task.MessageCollector;
> import org.apache.samza.task.StreamTask;
> import org.apache.samza.task.TaskCoordinator;
> 
> public class uimodPageloadStreamTask implements StreamTask {
>   private static final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, 
> “kafka-output-test”);
> 
>   @Override
> public void process(IncomingMessageEnvelope envelope,
>   MessageCollector collector,
>   TaskCoordinator coordinator) {
>   logger.info(envelope.getMessage());
>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, text));
>   }
> }
> 
> Also, in what log file would the log messages end up? /logs/stdout for that 
> application container?
> 
> I got System.out.println() to work similarly but it seems to be doing some 
> formatting of the messages. 
> 
> I am trying to consume messages from one Kafka topic and produce to another. 
> While I can use zookeeper to see the messages in the originating topic they 
> never make it to the destination and I am trying to find out why. 
> 
> Thanks for your help,
> 
> Jason
> 
> 
> On Friday, Jun 26, 2558 at 12:03, Rick Mangi  >, wrote:
> If you do something like this in your log4j.xml 
> 
>  
>  
>  
>  
> 
>  
>  
>  
>  
> 
> the root controls samza’s logging and the logger controls your own… I haven’t 
> managed to get the imx configuration working yet. 
> 
> 
> 
> 
> > On Jun 26, 2015, at 1:50 PM, ja...@marketingscience.co wrote: 
> > 
> > I was almost there. Got it now. Thanks for your help Rick. 
> > 
> > 
> > 
> > 
> > Cheers, 
> > 
> > 
> > 
> > 
> > Jason 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > On Friday, Jun 26, 2558 at 11:43, Rick Mangi , wrote: 
> > Hey Jason, 
> > 
> > 
> > If you configure log4j as described here: 
> > http://samza.apache.org/learn/documentation/0.9/jobs/logging.html 
> >  
> > 
> > 
> > Your log statements will wind up in the samza-container logs which you can 
> > get to via the application master gui. 
> > 
> > 
> > hth, 
> > 
> > 
> > Rick 
> > 
> > 
> > 
> >> On Jun 26, 2015, at 1:39 PM, ja...@marketingscience.co wrote: 
> > 
> >> 
> > 
> >> Hello, 
> > 
> >> 
> > 
> >> 
> > 
> >> I am working on a basic Samza task that pulls from one Kafka topic and 
> >> writes to another. This task runs in Yarn but the Output topic does not 
> >> contain any data. 
> > 
> >> 
> > 
> >> 
> > 
> >> In order to troubleshoot this more effectively I would like to log the 
> >> incoming message as my example below. Ideally, I would like to be able to 
> >> see the log messages in Yarn, maybe in the .out files in the /logs 
> >> directory. 
> > 
> >> 
> > 
> >> 
> > 
> >> Any advice is appreciated. 
> > 
> >> 
> > 
> >> 
> > 
> >> 
> > 
> >> Here is my task: 
> > 
> >> 
> > 
> >> 
> > 
> >> package com.project.samza.tasks; 
> > 
> >> 
> > 
> >> 
> > 
> >> import java.util.Map; 
> > 
> >> import org.apache.samza.system.IncomingMessageEnvelope; 
> > 
> >> import org.apache.samza.system.OutgoingMessageEnvelope; 
> > 
> >> import org.apache.samza.system.SystemStream; 
> > 
> >> import org.apache.samza.task.MessageCollector; 
> > 
> >> import org.apache.samza.task.StreamTask; 
> > 
> >> import org.apache.samza.task.TaskCoordinator; 
> > 
> >> 
> > 
> >> 
> > 
> >> public class exampleStreamTask implements StreamTask { 
> > 
> >> private static final SystemStream OUTPUT_STREAM = new 
> >> SystemStream(“kafka”, “new-topic-test”); 
> > 
> >> 
> > 
> >> 
> > 
> >> @Override 
> > 
> >> public void process(IncomingMessageEnvelope envelope, 
> > 
> >> MessageCollector collector, 
> > 
> >> TaskCoordinator coordinator) { 
> > 
> >> String msg = (String) envelope.getMessage(); 
> > 
> >> System.out.println(msg); 
> > 
> >> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, msg)); 
> > 
> >> } 
> > 
> >> } 
> > 
> >> 
> > 
> >> 
> > 
> >> Thanks, 
> > 
> >> 
> > 
> >> 
> > 
> >> Jason 
> 



Re: Best way to log from inside a Samza task?

2015-07-01 Thread jason
Rick, 


>From your example, what is the correct way to call the logger in the task? 
>Something like:





import java.util.Map;

import org.apache.samza.system.IncomingMessageEnvelope;

import org.apache.samza.system.OutgoingMessageEnvelope;

import org.apache.samza.system.SystemStream;

import org.apache.samza.task.MessageCollector;

import org.apache.samza.task.StreamTask;

import org.apache.samza.task.TaskCoordinator;







public class uimodPageloadStreamTask implements StreamTask {

  private static final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, 
“kafka-output-test”);




  @Override

public void process(IncomingMessageEnvelope envelope,

      MessageCollector collector,

      TaskCoordinator coordinator) {

          logger.info(envelope.getMessage());


         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, text));


      }

}




Also, in what log file would the log messages end up? /logs/stdout for that 
application container?




I got System.out.println() to work similarly but it seems to be doing some 
formatting of the messages. 




I am trying to consume messages from one Kafka topic and produce to another. 
While I can use zookeeper to see the messages in the originating topic they 
never make it to the destination and I am trying to find out why. 





Thanks for your help,




Jason









On Friday, Jun 26, 2558 at 12:03, Rick Mangi , wrote:
If you do something like this in your log4j.xml


  










  

  




the root controls samza’s logging and the logger controls your own… I haven’t 
managed to get the imx configuration working yet.





> On Jun 26, 2015, at 1:50 PM, ja...@marketingscience.co wrote:

> 

> I was almost there. Got it now. Thanks for your help Rick. 

> 

> 

> 

> 

> Cheers, 

> 

> 

> 

> 

> Jason

> 

> 

> 

> 

> 

> 

> 

> 

> 

> On Friday, Jun 26, 2558 at 11:43, Rick Mangi , wrote:

> Hey Jason,

> 

> 

> If you configure log4j as described here: 
> http://samza.apache.org/learn/documentation/0.9/jobs/logging.html 
> 

> 

> 

> Your log statements will wind up in the samza-container logs which you can 
> get to via the application master gui.

> 

> 

> hth,

> 

> 

> Rick

> 

> 

> 

>> On Jun 26, 2015, at 1:39 PM, ja...@marketingscience.co wrote:

> 

>> 

> 

>> Hello,

> 

>> 

> 

>> 

> 

>> I am working on a basic Samza task that pulls from one Kafka topic and 
>> writes to another. This task runs in Yarn but the Output topic does not 
>> contain any data. 

> 

>> 

> 

>> 

> 

>> In order to troubleshoot this more effectively I would like to log the 
>> incoming message as my example below. Ideally, I would like to be able to 
>> see the log messages in Yarn, maybe in the .out files in the /logs 
>> directory. 

> 

>> 

> 

>> 

> 

>> Any advice is appreciated. 

> 

>> 

> 

>> 

> 

>> 

> 

>> Here is my task:

> 

>> 

> 

>> 

> 

>> package com.project.samza.tasks;

> 

>> 

> 

>> 

> 

>> import java.util.Map;

> 

>> import org.apache.samza.system.IncomingMessageEnvelope;

> 

>> import org.apache.samza.system.OutgoingMessageEnvelope;

> 

>> import org.apache.samza.system.SystemStream;

> 

>> import org.apache.samza.task.MessageCollector;

> 

>> import org.apache.samza.task.StreamTask;

> 

>> import org.apache.samza.task.TaskCoordinator;

> 

>> 

> 

>> 

> 

>> public class exampleStreamTask implements StreamTask {

> 

>>  private static final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, 
>> “new-topic-test”);

> 

>> 

> 

>> 

> 

>>  @Override

> 

>>  public void process(IncomingMessageEnvelope envelope,

> 

>>  MessageCollector collector,

> 

>>  TaskCoordinator coordinator) {

> 

>>  String msg = (String) envelope.getMessage();

> 

>>  System.out.println(msg);

> 

>>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, msg));

> 

>>  }

> 

>> }

> 

>> 

> 

>> 

> 

>> Thanks, 

> 

>> 

> 

>> 

> 

>> Jason

Re: SamzaAppMaster failling on yarn 2.5.1

2015-07-01 Thread Yi Pan
Hi, Nelson,

We build and test Samza against YARN-2.5. There should not be an
incompatibility issue here. From your logs, it seems that it is a security
exception. Could you let us know your YARN site configuration? Is there any
security mechanism configured in your YARN cluster that requires the
AppMaster to register w/ some security credentials? We could try to
re-produce the issue if we know your configuration better.

Thanks!

-Yi

On Wed, Jul 1, 2015 at 6:33 AM, Nelson VERDIER 
wrote:

> Hi,
>
> Being curious about samza i've decided to play with it a little. Having
> successfully run hello samza plus some custom StreamTasks of my own on my
> local computer i decided to reproduce the experience on a in-house cluster
> we have.
>
> The cluster is a MapR 4.1.0 running yarn 2.5.1. The problem we face is when
> yarn is trying to register samza app master.
>
> Below are container logs:
>
> Exception in thread "main" javax.security.sasl.SaslException:
> DIGEST-MD5: digest response format violation. Mismatched URI:
> default/; expecting: null/default [Caused by
> org.apache.hadoop.ipc.RemoteException(javax.security.sasl.SaslException):
> DIGEST-MD5: digest response format violation. Mismatched URI:
> default/; expecting: null/default]
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:109)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
> at com.sun.proxy.$Proxy17.registerApplicationMaster(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:196)
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138)
> at
> org.apache.samza.job.yarn.SamzaAppMasterLifecycle.onInit(SamzaAppMasterLifecycle.scala:39)
> at
> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
> at
> org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.samza.job.yarn.SamzaAppMaster$.run(SamzaAppMaster.scala:108)
> at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:95)
> at
> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>
> And RM logs:
>
> 2015-07-01 14:13:28,551 INFO
>
> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
> appattempt_1435679592284_0004_01 State change from ALLOCATED to
> LAUNCHED
> 2015-07-01 14:13:29,519 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_1435679592284_0004_01_01 Container Transitioned from
> ACQUIRED to RUNNING
> 2015-07-01 14:13:34,899 WARN
> SecurityLogger.org.apache.hadoop.ipc.Server: Auth failed for
> 10.19.1.194:44153:null (DIGEST-MD5: digest response format violation.
> Mismatched URI: default/; expecting: null/default)
> 2015-07-01 14:13:34,900 INFO org.apache.hadoop.ipc.Server: Socket
> Reader #1 for port 8030: readAndProcess from client 10.19.1.194 threw
> exception [javax.security.sasl.SaslException: DIGEST-MD5: digest
> response format violation. Mismatched URI: default/
> ; expecting: null/default]
> 2015-07-01 14:13:35,763 WARN
> SecurityLogger.org.apache.hadoop.ipc.Server: Auth failed for
> 10.19.1.194:44154:null (DIGEST-MD5: digest response format violation.
> Mismatched URI: default/; expecting: null/default)
> 2015-07-01 14:13:35,764 INFO org.apache.hadoop.ipc.Server: Socket
> Reader #1 for port 8030: readAndProcess from client 10.19.1.194 threw
> exception [javax.security.sasl.SaslException: DIGEST-MD5: digest
> response format violation. Mismatched URI: default/
> ; expecting: null/default]
>
> I am desperate to understand why the samzaAppMaster can't be

SamzaAppMaster failling on yarn 2.5.1

2015-07-01 Thread Nelson VERDIER
Hi,

Being curious about samza i've decided to play with it a little. Having
successfully run hello samza plus some custom StreamTasks of my own on my
local computer i decided to reproduce the experience on a in-house cluster
we have.

The cluster is a MapR 4.1.0 running yarn 2.5.1. The problem we face is when
yarn is trying to register samza app master.

Below are container logs:

Exception in thread "main" javax.security.sasl.SaslException:
DIGEST-MD5: digest response format violation. Mismatched URI:
default/; expecting: null/default [Caused by
org.apache.hadoop.ipc.RemoteException(javax.security.sasl.SaslException):
DIGEST-MD5: digest response format violation. Mismatched URI:
default/; expecting: null/default]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at 
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy17.registerApplicationMaster(Unknown Source)
at 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:196)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138)
at 
org.apache.samza.job.yarn.SamzaAppMasterLifecycle.onInit(SamzaAppMasterLifecycle.scala:39)
at 
org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
at 
org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$run$1.apply(SamzaAppMaster.scala:108)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.samza.job.yarn.SamzaAppMaster$.run(SamzaAppMaster.scala:108)
at 
org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:95)
at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)

And RM logs:

2015-07-01 14:13:28,551 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1435679592284_0004_01 State change from ALLOCATED to
LAUNCHED
2015-07-01 14:13:29,519 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1435679592284_0004_01_01 Container Transitioned from
ACQUIRED to RUNNING
2015-07-01 14:13:34,899 WARN
SecurityLogger.org.apache.hadoop.ipc.Server: Auth failed for
10.19.1.194:44153:null (DIGEST-MD5: digest response format violation.
Mismatched URI: default/; expecting: null/default)
2015-07-01 14:13:34,900 INFO org.apache.hadoop.ipc.Server: Socket
Reader #1 for port 8030: readAndProcess from client 10.19.1.194 threw
exception [javax.security.sasl.SaslException: DIGEST-MD5: digest
response format violation. Mismatched URI: default/
; expecting: null/default]
2015-07-01 14:13:35,763 WARN
SecurityLogger.org.apache.hadoop.ipc.Server: Auth failed for
10.19.1.194:44154:null (DIGEST-MD5: digest response format violation.
Mismatched URI: default/; expecting: null/default)
2015-07-01 14:13:35,764 INFO org.apache.hadoop.ipc.Server: Socket
Reader #1 for port 8030: readAndProcess from client 10.19.1.194 threw
exception [javax.security.sasl.SaslException: DIGEST-MD5: digest
response format violation. Mismatched URI: default/
; expecting: null/default]

I am desperate to understand why the samzaAppMaster can't be launched
properly. Is Yarn 2.5 compatible with samza 0.9. Do you have any idea why
we have this digest-md5 issue?

Regards,

-- 
*Nelson Verdier*


Review Request 36089: SAMZA-670 Allow easier access to JMX port

2015-07-01 Thread József Márton Jung

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36089/
---

Review request for samza.


Repository: samza


Description
---

JMX address of application master and the containers is available through AM UI


Diffs
-

  checkstyle/import-control.xml 3374f0c 
  samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
fd7333b 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
e661e12 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
 6c1e488 
  samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java 
98a34bc 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 95a2ce5 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
8ee034a 
  samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala f343faf 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 1ce7d25 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml cf0d2fc 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
20aa373 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
1445605 

Diff: https://reviews.apache.org/r/36089/diff/


Testing
---


Thanks,

József Márton Jung