Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers

2015-07-29 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36903/#review93524
---

Ship it!


Ship It!

- Navina Ramesh


On July 29, 2015, 9:48 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36903/
> ---
> 
> (Updated July 29, 2015, 9:48 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-744
> https://issues.apache.org/jira/browse/SAMZA-744
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-744: shutdown stores before shutdown producers
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517048ad5730762506426ee7578c66181db8 
> 
> Diff: https://reviews.apache.org/r/36903/diff/
> 
> 
> Testing
> ---
> 
> ./bin/check-all.sh passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-07-29 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated July 29, 2015, 10:49 p.m.)


Review request for samza.


Changes
---

remove whiltespaces
update to latest master


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 6654319 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd3 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers

2015-07-29 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36903/
---

(Updated July 29, 2015, 9:48 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Navina 
Ramesh.


Summary (updated)
-

SAMZA-744: shutdown stores before shutdown producers


Bugs: SAMZA-744
https://issues.apache.org/jira/browse/SAMZA-744


Repository: samza


Description
---

SAMZA-744: shutdown stores before shutdown producers


Diffs
-

  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517048ad5730762506426ee7578c66181db8 

Diff: https://reviews.apache.org/r/36903/diff/


Testing (updated)
---

./bin/check-all.sh passed


Thanks,

Yi Pan (Data Infrastructure)



Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-29 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93501
---


Just a few comments. Overall, looks good!


docs/learn/documentation/versioned/hdfs/producer.md (line 24)


Can you please update the list of available configuration for this system 
at 
http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
 as well? 

Like we did with elastic search, adding an example job to hello-samza will 
be very useful for adoption. If not in this JIRA, please consider opening a 
follow-up JIRA to add this example.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
 (line 37)


What is the motivation for having 2 sequenceFileHdfsWriters?  Is the only 
difference in the data type being written to the fs? 

The difference is not clear from the class's javadoc or the example 
configuration provided. The reason I ask is we already have a config overload 
in Samza. I would rather not have the user configure a writer type, if we can 
safely default to a single type and only override the relevant diverging 
properties. Functionally, I believe they are doing the same thing.


- Navina Ramesh


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35445/
> ---
> 
> (Updated July 28, 2015, 5:25 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-693: Very basic HDFS Producer service for Samza
> 
> 
> Diffs
> -
> 
>   build.gradle 0852adc 
>   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
> PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 
> PRE-CREATION 
>   samza-hdfs/src/test/resources/samza-hdfs-test-job.properties PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  PRE-CREATION 
>   settings.gradle 19bff97 
> 
> Diff: https://reviews.apache.org/r/35445/diff/
> 
> 
> Testing
> ---
> 
> Updated: See JIRA SAMZA-693 for details, this latest update (693-4) addresses 
> post-review issues and adds more pluggable design, several default writer 
> implementations, and more (and more thorough) unit tests.
> 
> Passes 'gradle clean test'.
> 
> 
> Thanks,
> 
> Eli Reisman
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thanks, Yi!

On Wed, Jul 29, 2015 at 12:16 PM, Yi Pan  wrote:

> Hi, Roger,
>
> I am testing the patch now. Will update the JIRA soon.
>
> Thanks!
>
> -Yi
>
> On Wed, Jul 29, 2015 at 12:11 PM, Roger Hoover 
> wrote:
>
> > Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
> > committers please take a look?
> >
> > On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey 
> > wrote:
> >
> > >This is an automatically generated e-mail. To reply, visit:
> > > https://reviews.apache.org/r/36815/
> > >
> > > On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
> > >
> > >
> > >
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> > > <
> >
> https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116
> >
> > (Diff
> > > revision 6)
> > >
> > > public void register(final String source) {
> > >
> > >116
> > >
> > >   LOGGER.info(itemResp.getFailureMessage());
> > >
> > >   Should we add a Samza specifc message, then add the whole exception?
> > so it's more clear what the exception was from if the user doesn't know
> the
> > code? Logger.info("Failed to index message in ElasticSearch.", e);
> > >
> > > This would also be true for other log lines added.
> > >
> > >  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
> > >
> > > Good idea.  Thanks.
> > >
> > > BTW, it didn't work like this: Logger.info("Failed to index message in
> > ElasticSearch.", itemResp.getFailure()) so I did this:
> > >
> > > LOGGER.error("Failed to index document in Elasticsearch: " +
> > itemResp.getFailureMessage());
> > >
> > >  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
> > >
> > > This is what the messages look like
> > >
> > > 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index
> > document in Elasticsearch:
> > VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]:
> > version conflict, current [9], provided [5]]
> > >
> > >  That looks fine!
> > >
> > >
> > > - Dan
> > >
> > > On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
> > >   Review request for samza and Dan Harvey.
> > > By Roger Hoover.
> > >
> > > *Updated July 29, 2015, 6:24 p.m.*
> > >  *Repository: * samza
> > > Description
> > >
> > > SAMZA-741 Add support for versioning to Elasticsearch System Producer
> > >
> > >   Testing
> > >
> > > Refactored DefaultIndexRequestFactory to make it easier to subclass and
> > customize to handle version and version_type parameters.
> > >
> > >   Diffs
> > >
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> > >(f61bd36)
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
> > >(e3b635b)
> > >-
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
> > >(afe0eee)
> > >-
> >
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
> > >(980964f)
> > >-
> >
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
> > >(684d7f6)
> > >
> > > View Diff 
> > >
> >
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Yi Pan
Hi, Roger,

I am testing the patch now. Will update the JIRA soon.

Thanks!

-Yi

On Wed, Jul 29, 2015 at 12:11 PM, Roger Hoover 
wrote:

> Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
> committers please take a look?
>
> On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey 
> wrote:
>
> >This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/36815/
> >
> > On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
> >
> >
> >
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> > <
> https://reviews.apache.org/r/36815/diff/6/?file=1024157#file1024157line116>
> (Diff
> > revision 6)
> >
> > public void register(final String source) {
> >
> >116
> >
> >   LOGGER.info(itemResp.getFailureMessage());
> >
> >   Should we add a Samza specifc message, then add the whole exception?
> so it's more clear what the exception was from if the user doesn't know the
> code? Logger.info("Failed to index message in ElasticSearch.", e);
> >
> > This would also be true for other log lines added.
> >
> >  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
> >
> > Good idea.  Thanks.
> >
> > BTW, it didn't work like this: Logger.info("Failed to index message in
> ElasticSearch.", itemResp.getFailure()) so I did this:
> >
> > LOGGER.error("Failed to index document in Elasticsearch: " +
> itemResp.getFailureMessage());
> >
> >  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
> >
> > This is what the messages look like
> >
> > 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index
> document in Elasticsearch:
> VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]:
> version conflict, current [9], provided [5]]
> >
> >  That looks fine!
> >
> >
> > - Dan
> >
> > On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
> >   Review request for samza and Dan Harvey.
> > By Roger Hoover.
> >
> > *Updated July 29, 2015, 6:24 p.m.*
> >  *Repository: * samza
> > Description
> >
> > SAMZA-741 Add support for versioning to Elasticsearch System Producer
> >
> >   Testing
> >
> > Refactored DefaultIndexRequestFactory to make it easier to subclass and
> customize to handle version and version_type parameters.
> >
> >   Diffs
> >
> >-
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
> >(f61bd36)
> >-
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
> >(e3b635b)
> >-
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
> >(afe0eee)
> >-
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
> >(980964f)
> >-
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
> >(684d7f6)
> >
> > View Diff 
> >
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover
Thank you, Dan.  I think we're ready to merge.  Can one of the Samza
committers please take a look?

On Wed, Jul 29, 2015 at 11:31 AM, Dan Harvey  wrote:

>This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
>
> On July 29th, 2015, 8:42 a.m. UTC, *Dan Harvey* wrote:
>
>
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  
> (Diff
> revision 6)
>
> public void register(final String source) {
>
>116
>
>   LOGGER.info(itemResp.getFailureMessage());
>
>   Should we add a Samza specifc message, then add the whole exception? so 
> it's more clear what the exception was from if the user doesn't know the 
> code? Logger.info("Failed to index message in ElasticSearch.", e);
>
> This would also be true for other log lines added.
>
>  On July 29th, 2015, 6:22 p.m. UTC, *Roger Hoover* wrote:
>
> Good idea.  Thanks.
>
> BTW, it didn't work like this: Logger.info("Failed to index message in 
> ElasticSearch.", itemResp.getFailure()) so I did this:
>
> LOGGER.error("Failed to index document in Elasticsearch: " + 
> itemResp.getFailureMessage());
>
>  On July 29th, 2015, 6:28 p.m. UTC, *Roger Hoover* wrote:
>
> This is what the messages look like
>
> 2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
> document in Elasticsearch: 
> VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
> version conflict, current [9], provided [5]]
>
>  That looks fine!
>
>
> - Dan
>
> On July 29th, 2015, 6:24 p.m. UTC, Roger Hoover wrote:
>   Review request for samza and Dan Harvey.
> By Roger Hoover.
>
> *Updated July 29, 2015, 6:24 p.m.*
>  *Repository: * samza
> Description
>
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
>
>   Testing
>
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
>
>   Diffs
>
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>(f61bd36)
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>(e3b635b)
>- 
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>(afe0eee)
>- 
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>(980964f)
>- 
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>(684d7f6)
>
> View Diff 
>


Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Dan Harvey


> On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 116
> > 
> >
> > Should we add a Samza specifc message, then add the whole exception? so 
> > it's more clear what the exception was from if the user doesn't know the 
> > code? `Logger.info("Failed to index message in ElasticSearch.", e);`
> > 
> > This would also be true for other log lines added.
> 
> Roger Hoover wrote:
> Good idea.  Thanks.
> 
> BTW, it didn't work like this: Logger.info("Failed to index message in 
> ElasticSearch.", itemResp.getFailure()) so I did this:
> 
> LOGGER.error("Failed to index document in Elasticsearch: " + 
> itemResp.getFailureMessage());
> 
> Roger Hoover wrote:
> This is what the messages look like
> 
> ```2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
> document in Elasticsearch: 
> VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
> version conflict, current [9], provided [5]]```

That looks fine!


- Dan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:24 p.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:24 p.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93493
---

Ship it!


LGTM. Thanks a lot!

- Yi Pan (Data Infrastructure)


On July 29, 2015, 6:24 p.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:24 p.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


> On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 116
> > 
> >
> > Should we add a Samza specifc message, then add the whole exception? so 
> > it's more clear what the exception was from if the user doesn't know the 
> > code? `Logger.info("Failed to index message in ElasticSearch.", e);`
> > 
> > This would also be true for other log lines added.
> 
> Roger Hoover wrote:
> Good idea.  Thanks.
> 
> BTW, it didn't work like this: Logger.info("Failed to index message in 
> ElasticSearch.", itemResp.getFailure()) so I did this:
> 
> LOGGER.error("Failed to index document in Elasticsearch: " + 
> itemResp.getFailureMessage());

This is what the messages look like

```2015-07-29 11:15:02 ElasticsearchSystemProducer [INFO] Failed to index 
document in Elasticsearch: 
VersionConflictEngineException[[test-embedded.2015-07-29][0] [stuff][d3]: 
version conflict, current [9], provided [5]]```


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:24 p.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:24 p.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/
---

(Updated July 29, 2015, 6:24 p.m.)


Review request for samza and Dan Harvey.


Changes
---

More descriptive log messages


Repository: samza


Description
---

SAMZA-741 Add support for versioning to Elasticsearch System Producer


Diffs (updated)
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 f61bd36 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 e3b635b 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
 afe0eee 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
 980964f 
  
samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
 684d7f6 

Diff: https://reviews.apache.org/r/36815/diff/


Testing
---

Refactored DefaultIndexRequestFactory to make it easier to subclass and 
customize to handle version and version_type parameters.


Thanks,

Roger Hoover



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Roger Hoover


> On July 29, 2015, 8:42 a.m., Dan Harvey wrote:
> > samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java,
> >  line 116
> > 
> >
> > Should we add a Samza specifc message, then add the whole exception? so 
> > it's more clear what the exception was from if the user doesn't know the 
> > code? `Logger.info("Failed to index message in ElasticSearch.", e);`
> > 
> > This would also be true for other log lines added.

Good idea.  Thanks.

BTW, it didn't work like this: Logger.info("Failed to index message in 
ElasticSearch.", itemResp.getFailure()) so I did this:

LOGGER.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());


- Roger


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:22 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


> On July 29, 2015, 2:45 p.m., Robert Zuljevic wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 121-125
> > 
> >
> > Did you mean something like this?
> > 
> > for ((storeName, systemStream) <- changeLogSystemStreams) {
> >   val systemAdmin = Util.getObj[SystemFactory](config
> > .getSystemFactory(systemStream.getSystem)
> > .getOrElse(throw new SamzaException("A stream uses system %s, 
> > which is missing from the configuration." format systemStream.getSystem))
> > ).getAdmin(systemStream.getSystem, config)
> > 
> >   systemAdmin.createChangelogStream(systemStream.getStream, 
> > changeLogPartitions)
> > }
> > 
> > This is the only way I could thought of for simplifing this. I don't 
> > think what you posted would work, because you're using String's map 
> > function, but it did steer me in the right direction. Do you agree?

yes, this is correct. :) Thanks.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 121-125
> > 
> >
> > this can be simplified a little:
> > 
> > for ((storeName, systemStream) <- changeLogSystemStreams) {
> >   val systemAdmin = config
> > .getSystemFactory(systemStream.getName)
> > .getOrElse(throw new SamzaException("A stream uses system %s, 
> > which is missing from the configuration." format 
> > systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
> >   throw new SamzaException("Unable to get systemAdmin for store 
> > " + storeName + " and systemStream" + systemStream))
> >   
> >   
> > Then  do not need line 104-109, line 117-119.
> 
> Robert Zuljevic wrote:
> Did you mean something like this?
> 
> for ((storeName, systemStream) <- changeLogSystemStreams) {
>   val systemAdmin = Util.getObj[SystemFactory](config
> .getSystemFactory(systemStream.getSystem)
> .getOrElse(throw new SamzaException("A stream uses system %s, 
> which is missing from the configuration." format systemStream.getSystem))
> ).getAdmin(systemStream.getSystem, config)
> 
>   systemAdmin.createChangelogStream(systemStream.getStream, 
> changeLogPartitions)
> }
> 
> 
> This is the only way I could thought of for simplifing this. I don't 
> think what you posted would work, because you're using String's map function, 
> but it did steer me in the right direction. Do you agree?

yes, you are right. This is what I was thinking. Not tested the code though. :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Required vs. optional methods for KeyValueStore

2015-07-29 Thread Navina Ramesh
Ken,

range() and all() are used as needed by the tasks, and not by the
framework. However, if these features are skipped, it should be made clear
to the user that these operations are not supported for your store.

Also, you might have to update some tests in samza-test, which runs a fixed
set of tests on all types of key-value store supported by samza.

I agree that we can add more javadocs indicating which methods are
mandatory while implementing a KV store.

Cheers!
Navina

On Wed, Jul 29, 2015 at 10:58 AM, Ken Krugler 
wrote:

> Hi Navina,
>
> Thanks for confirming that putAll(list) is a required method, for
> supporting the changelog functionality.
>
> I'm hoping you or others can confirm that range() and all() are _not_ used
> by the Samza system - i.e. these are only used internally (as needed) by
> tasks.
>
> And if the above is true, then adding some Javadoc notes about which
> methods are required (used by the Samza system) for changelog support vs.
> optional (only used by task-specific code as needed) would be very helpful.
>
> Thanks!
>
> -- Ken
>
> > From: Navina Ramesh
> > Sent: July 29, 2015 10:38:45am PDT
> > To: dev@samza.apache.org
> > Subject: Re: Required vs. optional methods for KeyValueStore
> >
> > Hi Ken,
> >
> > We use putAll(list) when restoring from changelog. So, unless you don't
> > want your store to have support for changelog, the implementation is
> > required.
> >
> > I only have a high-level overview of what Solr is. Perhaps, others on the
> > mailing list have experience with Solr and can provide more useful
> > information.
> >
> > Thanks!
> > Navina
> >
> > On Tue, Jul 28, 2015 at 5:30 PM, Ken Krugler <
> kkrugler_li...@transpac.com>
> > wrote:
> >
> >> Hi all,
> >>
> >> I'm looking at using embedded Solr as the KeyValueStore, as that lets me
> >> extract ranked results from the state to publish as part of the task's
> >> operation.
> >>
> >> Some of the methods defined by KeyValueStore are problematic, though -
> >> specifically the range() and all() methods that return iterators.
> >>
> >> Iterating over lots of results in Solr, while more feasible with newer
> >> paging support, is still an abuse of its architecture :)
> >>
> >> So I'm wondering whether I need to support those methods, or are they
> only
> >> called internally by tasks (e.g. my task) and thus can be optional.
> >>
> >> I'm assuming that when state is being automatically restored from a
> >> changelog, the Samza system is calling putAll(list) repeatedly, but I
> >> haven't dug into those details. So that would be an example of a
> required
> >> method.
> >>
> >> Thanks,
> >>
> >> -- Ken
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
>


-- 
Navina R.


RE: Required vs. optional methods for KeyValueStore

2015-07-29 Thread Ken Krugler
Hi Navina,

Thanks for confirming that putAll(list) is a required method, for supporting 
the changelog functionality.

I'm hoping you or others can confirm that range() and all() are _not_ used by 
the Samza system - i.e. these are only used internally (as needed) by tasks.

And if the above is true, then adding some Javadoc notes about which methods 
are required (used by the Samza system) for changelog support vs. optional 
(only used by task-specific code as needed) would be very helpful.

Thanks!

-- Ken

> From: Navina Ramesh
> Sent: July 29, 2015 10:38:45am PDT
> To: dev@samza.apache.org
> Subject: Re: Required vs. optional methods for KeyValueStore
> 
> Hi Ken,
> 
> We use putAll(list) when restoring from changelog. So, unless you don't
> want your store to have support for changelog, the implementation is
> required.
> 
> I only have a high-level overview of what Solr is. Perhaps, others on the
> mailing list have experience with Solr and can provide more useful
> information.
> 
> Thanks!
> Navina
> 
> On Tue, Jul 28, 2015 at 5:30 PM, Ken Krugler 
> wrote:
> 
>> Hi all,
>> 
>> I'm looking at using embedded Solr as the KeyValueStore, as that lets me
>> extract ranked results from the state to publish as part of the task's
>> operation.
>> 
>> Some of the methods defined by KeyValueStore are problematic, though -
>> specifically the range() and all() methods that return iterators.
>> 
>> Iterating over lots of results in Solr, while more feasible with newer
>> paging support, is still an abuse of its architecture :)
>> 
>> So I'm wondering whether I need to support those methods, or are they only
>> called internally by tasks (e.g. my task) and thus can be optional.
>> 
>> I'm assuming that when state is being automatically restored from a
>> changelog, the Samza system is calling putAll(list) repeatedly, but I
>> haven't dug into those details. So that would be an example of a required
>> method.
>> 
>> Thanks,
>> 
>> -- Ken


--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







Re: Review Request 36905: SAMZA-745 elasticsearch module has Javadoc warning

2015-07-29 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36905/#review93479
---

Ship it!


Ship It!

- Yi Pan (Data Infrastructure)


On July 29, 2015, 9:10 a.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36905/
> ---
> 
> (Updated July 29, 2015, 9:10 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Quick fix for javadoc.
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  d8ca70e 
> 
> Diff: https://reviews.apache.org/r/36905/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Pejakovic
> 
>



Re: Required vs. optional methods for KeyValueStore

2015-07-29 Thread Navina Ramesh
Hi Ken,

We use putAll(list) when restoring from changelog. So, unless you don't
want your store to have support for changelog, the implementation is
required.

I only have a high-level overview of what Solr is. Perhaps, others on the
mailing list have experience with Solr and can provide more useful
information.

Thanks!
Navina

On Tue, Jul 28, 2015 at 5:30 PM, Ken Krugler 
wrote:

> Hi all,
>
> I'm looking at using embedded Solr as the KeyValueStore, as that lets me
> extract ranked results from the state to publish as part of the task's
> operation.
>
> Some of the methods defined by KeyValueStore are problematic, though -
> specifically the range() and all() methods that return iterators.
>
> Iterating over lots of results in Solr, while more feasible with newer
> paging support, is still an abuse of its architecture :)
>
> So I'm wondering whether I need to support those methods, or are they only
> called internally by tasks (e.g. my task) and thus can be optional.
>
> I'm assuming that when state is being automatically restored from a
> changelog, the Samza system is calling putAll(list) repeatedly, but I
> haven't dug into those details. So that would be an example of a required
> method.
>
> Thanks,
>
> -- Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
>


-- 
Navina R.


Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-29 Thread Navina Ramesh


> On July 24, 2015, 6:01 p.m., Navina Ramesh wrote:
> > Thanks for picking this up! It feels good to look at a refactored code. 
> > 
> > One suggestion: Please run all the intergration test (including the zopkio 
> > tests) before checking in this patch. I don't think we cleanly start and 
> > stop coordinator stream producers/consumers in all the managers. Please 
> > verify that nothing is broken due to this change.
> 
> József Márton Jung wrote:
> I have difficulties running the zopkio tests. The error message is the 
> following: 
> 2015-07-27 11:36:44,608 zopkio.remote_host_helper [ERROR] Error: 
> JAVA_HOME is not set and could not be found.
> 
> JAVA_HOME is set on my machine (in /etc/profile, so it is available 
> system-wide), echoing it outputs the path to Java installation. I don't knw 
> what is wrong.

I think the problem is with the remote host (host to which zopkio is trying to 
ssh) not having JAVA_HOME set correctly. Are you running the test on a remote 
machine?


- Navina


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/#review92938
---


On July 27, 2015, 10:15 a.m., József Márton Jung wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36545/
> ---
> 
> (Updated July 27, 2015, 10:15 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> The following has been refactored: 
> 1. Static inner classes from CoordinatorStreamMessage has been extracted
> 2. Common functionality from CheckpointManager, ChangelogMappingManager and 
> LocalityManager has benn moved to a base class
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 6654319 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 7445996 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> 55c258f 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  e5ab4fb 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  b1078bd 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  92f8907 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
>  f769756 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ad6387d 
>   
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
>  7d3409c 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> f621611 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 1c178a6 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  e454593 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  ac26a01 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  c25f6a7 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  1ef07d0 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
>  c484660 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 84fdeaa 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 41303f7 
> 
> Diff: https://reviews.apache.org/r/36545/diff/
> 
> 
> Testing
> ---
> 
> Tests has been updated.
> 
> 
> Thanks,
> 
> József Márton Jung
> 
>



Re: [DISCUSS] Release 0.10.0

2015-07-29 Thread Navina Ramesh
+1 for StreamAppender bug fix as a mandatory in 0.10.

@Yan: will review SAMZA-676 today and test it out locally.

Thanks!
Navina

On Wed, Jul 29, 2015 at 8:38 AM, Chinmay Soman 
wrote:

> I can take care of SAMZA-340, SAMZA-683  and will follow up with Luis for
> SAMZA-401,2,3,4
>
> On Wed, Jul 29, 2015 at 12:10 AM, Dan  wrote:
>
> > I agree SAMZA-741 for the ElasticSearch producer should be in too so
> we've
> > got a better API as part of that release.
> >
> >  - Dan
> >
> >
> > On 29 July 2015 at 07:51, Yan Fang  wrote:
> >
> > > Actually, I also want to include a few patch-available features,
> > > especially:
> > >
> > > 1. broadcast stream (SAMZA-676)
> > > - waiting for review
> > >
> > > 2. graphite support (SAMZA-340)
> > > 3. meter and histogram (SAMZA-683)
> > > 4. utility (SAMZA-401)
> > > - 2,3,4 belong to Luis, if he does not have time to update, since
> > they
> > > only need some small changes, we can edit it and get +1 from another
> > > committer.
> > >
> > > 5. hdfs producer (SAMZA-693)
> > > - I am reviewing.
> > >
> > > 6. upgrade yarn to 2.7.1 (SAMZA-563)
> > >- though I am reviewing, this ticket is negotiable if we want to put
> > > into the 0.10.0 release. If we do not, I think, when users enable the
> > > worker-persisting and container-persisting features, Samza will not be
> > able
> > > to handle it. (Some classes are only available after yarn 2.5.0 while
> > Samza
> > > currently only support yarn 2.4.0)
> > >
> > > 7. others: scrooge, class loader isolation, etc.
> > > - those are waiting for reviewing too.
> > >
> > > My opinion is that, if we can clean up all the patch-available tickets,
> > it
> > > will be great. Most of them have been already reviewed more than once.
> > So I
> > > think it should not be very time-consuming to have them in the 0.10.0
> > > release.
> > >
> > > What do you think?
> > >
> > > Of course, another must-have is the bug-fix of the Stream Appender. :)
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Tue, Jul 28, 2015 at 10:27 PM, Roger Hoover  >
> > > wrote:
> > >
> > > > Thanks, Yi.
> > > >
> > > > I propose that we also include SAMZA-741 for Elasticsearch versioning
> > > > support with the new ES producer.  I think it's very close to being
> > > merged.
> > > >
> > > > Roger
> > > >
> > > >
> > > > On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan 
> wrote:
> > > >
> > > > > Hi, all,
> > > > >
> > > > > I want to start the discussion on the release schedule for 0.10.0.
> > > There
> > > > > are a few important features that we plan to release in 0.10.0 and
> I
> > > want
> > > > > to start this thread s.t. we can agree on what to include in 0.10.0
> > > > > release.
> > > > >
> > > > > There are the following main features added in 0.10.0:
> > > > > - RocksDB TTL support
> > > > > - Add CoordinatorStream and disable CheckpointManager
> > > > > - Elasticsearch Producer
> > > > > - Host affinity
> > > > > And other 0.10.0 tickets:
> > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
> > > > >
> > > > > I propose to cut a 0.10.0 release after we get the following issues
> > > > > resolved:
> > > > > - SAMZA-615: Migrate checkpoint from checkpoint topic to
> Coordinator
> > > > stream
> > > > > - SAMZA-617: YARN host affinity in Samza
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Thanks!
> > > > >
> > > > > -Yi
> > > > >
> > > >
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



-- 
Navina R.


Re: changelog compaction problem

2015-07-29 Thread Roger Hoover
You also may want to check if the cleaner thread in the broker is still
alive (using jstack).  I've run into this issue and used the fix mentioned
in the ticket to get compaction working again.

https://issues.apache.org/jira/browse/KAFKA-1641
I'd just like to mention that a possible workaround (depending on your
situation in regard to keys) is to stop the broker, remove the cleaner
offset checkpoint, and then start the broker again for each ISR member in
serial to get the thread running again. Keep in mind that the cleaner will
start from the beginning if you do this.

On Wed, Jul 29, 2015 at 8:43 AM, Chinmay Soman 
wrote:

> Just curious,
>
> Can you double check if you have log compaction enabled on your Kafka
> brokers ?
>
> On Wed, Jul 29, 2015 at 8:30 AM, Vladimir Lebedev  wrote:
>
> > Hello,
> >
> > I have a problem with changelog in one of my samza jobs grows
> indefinitely.
> >
> > The job is quite simple, it reads messages from the input kafka topic,
> and
> > either creates or updates a key in task-local samza store. Once in a
> minute
> > the window method kicks-in, it iterates over all keys in the store and
> > deletes some of them, selecting on the contents of their value.
> >
> > Message rate in input topic is about 3000 messages per second. The input
> > topic is partitioned in 48 partitions. Average number of keys, kept in
> the
> > store is more or less stable and do not exceed 1 keys per task.
> Average
> > size of values is 50 bytes. So I expected that sum of all segments' size
> in
> > kafka data directory for the job's changelog topic should not exceed
> > 1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB (after 6 days
> > running from scratch) and it is growing.
> >
> > I tried to change default segment size for changelog topic in kafka, and
> > it worked a bit - instead of 500Mbyte segments I have now 50Mbyte
> segments,
> > but it did not heal the indefinite data growth problem.
> >
> > Moreover, if I stop the job and start it again it cannot restart, it
> > breaks right after reading all records from changelog topic.
> >
> > Did somebody have similar problem? How it could be resolved?
> >
> > Best regards,
> > Vladimir
> >
> > --
> > Vladimir Lebedev
> > w...@fastmail.fm
> >
> >
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>


Re: changelog compaction problem

2015-07-29 Thread Chinmay Soman
Just curious,

Can you double check if you have log compaction enabled on your Kafka
brokers ?

On Wed, Jul 29, 2015 at 8:30 AM, Vladimir Lebedev  wrote:

> Hello,
>
> I have a problem with changelog in one of my samza jobs grows indefinitely.
>
> The job is quite simple, it reads messages from the input kafka topic, and
> either creates or updates a key in task-local samza store. Once in a minute
> the window method kicks-in, it iterates over all keys in the store and
> deletes some of them, selecting on the contents of their value.
>
> Message rate in input topic is about 3000 messages per second. The input
> topic is partitioned in 48 partitions. Average number of keys, kept in the
> store is more or less stable and do not exceed 1 keys per task. Average
> size of values is 50 bytes. So I expected that sum of all segments' size in
> kafka data directory for the job's changelog topic should not exceed
> 1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB (after 6 days
> running from scratch) and it is growing.
>
> I tried to change default segment size for changelog topic in kafka, and
> it worked a bit - instead of 500Mbyte segments I have now 50Mbyte segments,
> but it did not heal the indefinite data growth problem.
>
> Moreover, if I stop the job and start it again it cannot restart, it
> breaks right after reading all records from changelog topic.
>
> Did somebody have similar problem? How it could be resolved?
>
> Best regards,
> Vladimir
>
> --
> Vladimir Lebedev
> w...@fastmail.fm
>
>


-- 
Thanks and regards

Chinmay Soman


Re: [DISCUSS] Release 0.10.0

2015-07-29 Thread Chinmay Soman
I can take care of SAMZA-340, SAMZA-683  and will follow up with Luis for
SAMZA-401,2,3,4

On Wed, Jul 29, 2015 at 12:10 AM, Dan  wrote:

> I agree SAMZA-741 for the ElasticSearch producer should be in too so we've
> got a better API as part of that release.
>
>  - Dan
>
>
> On 29 July 2015 at 07:51, Yan Fang  wrote:
>
> > Actually, I also want to include a few patch-available features,
> > especially:
> >
> > 1. broadcast stream (SAMZA-676)
> > - waiting for review
> >
> > 2. graphite support (SAMZA-340)
> > 3. meter and histogram (SAMZA-683)
> > 4. utility (SAMZA-401)
> > - 2,3,4 belong to Luis, if he does not have time to update, since
> they
> > only need some small changes, we can edit it and get +1 from another
> > committer.
> >
> > 5. hdfs producer (SAMZA-693)
> > - I am reviewing.
> >
> > 6. upgrade yarn to 2.7.1 (SAMZA-563)
> >- though I am reviewing, this ticket is negotiable if we want to put
> > into the 0.10.0 release. If we do not, I think, when users enable the
> > worker-persisting and container-persisting features, Samza will not be
> able
> > to handle it. (Some classes are only available after yarn 2.5.0 while
> Samza
> > currently only support yarn 2.4.0)
> >
> > 7. others: scrooge, class loader isolation, etc.
> > - those are waiting for reviewing too.
> >
> > My opinion is that, if we can clean up all the patch-available tickets,
> it
> > will be great. Most of them have been already reviewed more than once.
> So I
> > think it should not be very time-consuming to have them in the 0.10.0
> > release.
> >
> > What do you think?
> >
> > Of course, another must-have is the bug-fix of the Stream Appender. :)
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Tue, Jul 28, 2015 at 10:27 PM, Roger Hoover 
> > wrote:
> >
> > > Thanks, Yi.
> > >
> > > I propose that we also include SAMZA-741 for Elasticsearch versioning
> > > support with the new ES producer.  I think it's very close to being
> > merged.
> > >
> > > Roger
> > >
> > >
> > > On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan  wrote:
> > >
> > > > Hi, all,
> > > >
> > > > I want to start the discussion on the release schedule for 0.10.0.
> > There
> > > > are a few important features that we plan to release in 0.10.0 and I
> > want
> > > > to start this thread s.t. we can agree on what to include in 0.10.0
> > > > release.
> > > >
> > > > There are the following main features added in 0.10.0:
> > > > - RocksDB TTL support
> > > > - Add CoordinatorStream and disable CheckpointManager
> > > > - Elasticsearch Producer
> > > > - Host affinity
> > > > And other 0.10.0 tickets:
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
> > > >
> > > > I propose to cut a 0.10.0 release after we get the following issues
> > > > resolved:
> > > > - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator
> > > stream
> > > > - SAMZA-617: YARN host affinity in Samza
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks!
> > > >
> > > > -Yi
> > > >
> > >
> >
>



-- 
Thanks and regards

Chinmay Soman


changelog compaction problem

2015-07-29 Thread Vladimir Lebedev

Hello,

I have a problem with changelog in one of my samza jobs grows indefinitely.

The job is quite simple, it reads messages from the input kafka topic, 
and either creates or updates a key in task-local samza store. Once in a 
minute the window method kicks-in, it iterates over all keys in the 
store and deletes some of them, selecting on the contents of their value.


Message rate in input topic is about 3000 messages per second. The input 
topic is partitioned in 48 partitions. Average number of keys, kept in 
the store is more or less stable and do not exceed 1 keys per task. 
Average size of values is 50 bytes. So I expected that sum of all 
segments' size in kafka data directory for the job's changelog topic 
should not exceed 1*50*48 ~= 24Mbytes. In fact it is more than 2.5GB 
(after 6 days running from scratch) and it is growing.


I tried to change default segment size for changelog topic in kafka, and 
it worked a bit - instead of 500Mbyte segments I have now 50Mbyte 
segments, but it did not heal the indefinite data growth problem.


Moreover, if I stop the job and start it again it cannot restart, it 
breaks right after reading all records from changelog topic.


Did somebody have similar problem? How it could be resolved?

Best regards,
Vladimir

--
Vladimir Lebedev
w...@fastmail.fm



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Robert Zuljevic


> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 121-125
> > 
> >
> > this can be simplified a little:
> > 
> > for ((storeName, systemStream) <- changeLogSystemStreams) {
> >   val systemAdmin = config
> > .getSystemFactory(systemStream.getName)
> > .getOrElse(throw new SamzaException("A stream uses system %s, 
> > which is missing from the configuration." format 
> > systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
> >   throw new SamzaException("Unable to get systemAdmin for store 
> > " + storeName + " and systemStream" + systemStream))
> >   
> >   
> > Then  do not need line 104-109, line 117-119.

Did you mean something like this?

for ((storeName, systemStream) <- changeLogSystemStreams) {
  val systemAdmin = Util.getObj[SystemFactory](config
.getSystemFactory(systemStream.getSystem)
.getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemStream.getSystem))
).getAdmin(systemStream.getSystem, config)

  systemAdmin.createChangelogStream(systemStream.getStream, 
changeLogPartitions)
}


This is the only way I could thought of for simplifing this. I don't think what 
you posted would work, because you're using String's map function, but it did 
steer me in the right direction. Do you agree?


- Robert


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Robert Zuljevic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
---



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 121 - 125)


Did you mean something like this?

for ((storeName, systemStream) <- changeLogSystemStreams) {
  val systemAdmin = Util.getObj[SystemFactory](config
.getSystemFactory(systemStream.getSystem)
.getOrElse(throw new SamzaException("A stream uses system %s, which 
is missing from the configuration." format systemStream.getSystem))
).getAdmin(systemStream.getSystem, config)

  systemAdmin.createChangelogStream(systemStream.getStream, 
changeLogPartitions)
}

This is the only way I could thought of for simplifing this. I don't think 
what you posted would work, because you're using String's map function, but it 
did steer me in the right direction. Do you agree?


- Robert Zuljevic


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> ---
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> aeba61a95371faaba23c97d896321b8d95467f87 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366f0f60e401765a000fa265c59dee4a461e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> ---
> 
> I wasn't really sure what kind of test (unit test / integration test) I 
> should make here, so any pointers would be greatly appreaciated! I tested the 
> change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>



Re: Review Request 36905: SAMZA-745 elasticsearch module has Javadoc warning

2015-07-29 Thread Dan Harvey

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36905/#review93416
---


Lokks good.

- Dan Harvey


On July 29, 2015, 9:10 a.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36905/
> ---
> 
> (Updated July 29, 2015, 9:10 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Quick fix for javadoc.
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
>  d8ca70e 
> 
> Diff: https://reviews.apache.org/r/36905/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Pejakovic
> 
>



Review Request 36905: SAMZA-745 elasticsearch module has Javadoc warning

2015-07-29 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36905/
---

Review request for samza.


Repository: samza


Description
---

Quick fix for javadoc.


Diffs
-

  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
 d8ca70e 

Diff: https://reviews.apache.org/r/36905/diff/


Testing
---


Thanks,

Aleksandar Pejakovic



Re: Review Request 36815: SAMZA-741 Support for versioning with Elasticsearch Producer

2015-07-29 Thread Dan Harvey

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36815/#review93413
---



samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 (line 116)


Should we add a Samza specifc message, then add the whole exception? so 
it's more clear what the exception was from if the user doesn't know the code? 
`Logger.info("Failed to index message in ElasticSearch.", e);`

This would also be true for other log lines added.


One comment on the logging but I think this looks great otherwise.

- Dan Harvey


On July 29, 2015, 6:22 a.m., Roger Hoover wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36815/
> ---
> 
> (Updated July 29, 2015, 6:22 a.m.)
> 
> 
> Review request for samza and Dan Harvey.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-741 Add support for versioning to Elasticsearch System Producer
> 
> 
> Diffs
> -
> 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
>  f61bd36 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
>  e3b635b 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
>  afe0eee 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
>  980964f 
>   
> samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
>  684d7f6 
> 
> Diff: https://reviews.apache.org/r/36815/diff/
> 
> 
> Testing
> ---
> 
> Refactored DefaultIndexRequestFactory to make it easier to subclass and 
> customize to handle version and version_type parameters.
> 
> 
> Thanks,
> 
> Roger Hoover
> 
>



Re: Review Request 36727: SAMZA-563 Upgrade Samza to YARN 2.6.0

2015-07-29 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36727/
---

(Updated July 29, 2015, 8:39 a.m.)


Review request for samza.


Changes
---

Fixed test in samza-yarn -> TestSamzaAppMaster -> testAppMasterShouldReboot
  - changed allocate method in TestAMRMClientImpl to throw 
ApplicationAttemptNotFoundException if reboot is set to true


Repository: samza


Description
---

Upgraded version of yarn in samza.


Diffs (updated)
-

  bin/check-all.sh 67bf776 
  docs/contribute/tests.md f485ce2 
  docs/learn/tutorials/versioned/run-in-multi-node-yarn.md 312efaf 
  gradle/dependency-versions.gradle fb06e8e 
  samza-test/src/main/python/configs/downloads.json a75756f 
  samza-test/src/main/python/configs/yarn.json 9b0143d 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 
7b7d86a 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
 df5992e 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/36727/diff/


Testing
---

All test except one in samza-yarn -> TestSamzaAppMaster -> 
testAppMasterShouldReboot pass.
  - because of: [YARN-1365](https://issues.apache.org/jira/browse/YARN-1365), 
[YARN-1366](https://issues.apache.org/jira/browse/YARN-1366) and 
[YARN-1367](https://issues.apache.org/jira/browse/YARN-1366) they changed: -> 
AMRMClientAsyncImpl.java -> private class HeartbeatThread. It's method run() no 
longer catches ApplicationMasterNotRegisteredException (replacement for 
AMCommand.AM_RESYNC) and no longer does resyncing.

Without that test everything works just fine. Tried hello-samza, works fine.


Thanks,

Aleksandar Pejakovic



Re: [DISCUSS] Release 0.10.0

2015-07-29 Thread Dan
I agree SAMZA-741 for the ElasticSearch producer should be in too so we've
got a better API as part of that release.

 - Dan


On 29 July 2015 at 07:51, Yan Fang  wrote:

> Actually, I also want to include a few patch-available features,
> especially:
>
> 1. broadcast stream (SAMZA-676)
> - waiting for review
>
> 2. graphite support (SAMZA-340)
> 3. meter and histogram (SAMZA-683)
> 4. utility (SAMZA-401)
> - 2,3,4 belong to Luis, if he does not have time to update, since they
> only need some small changes, we can edit it and get +1 from another
> committer.
>
> 5. hdfs producer (SAMZA-693)
> - I am reviewing.
>
> 6. upgrade yarn to 2.7.1 (SAMZA-563)
>- though I am reviewing, this ticket is negotiable if we want to put
> into the 0.10.0 release. If we do not, I think, when users enable the
> worker-persisting and container-persisting features, Samza will not be able
> to handle it. (Some classes are only available after yarn 2.5.0 while Samza
> currently only support yarn 2.4.0)
>
> 7. others: scrooge, class loader isolation, etc.
> - those are waiting for reviewing too.
>
> My opinion is that, if we can clean up all the patch-available tickets, it
> will be great. Most of them have been already reviewed more than once. So I
> think it should not be very time-consuming to have them in the 0.10.0
> release.
>
> What do you think?
>
> Of course, another must-have is the bug-fix of the Stream Appender. :)
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Tue, Jul 28, 2015 at 10:27 PM, Roger Hoover 
> wrote:
>
> > Thanks, Yi.
> >
> > I propose that we also include SAMZA-741 for Elasticsearch versioning
> > support with the new ES producer.  I think it's very close to being
> merged.
> >
> > Roger
> >
> >
> > On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan  wrote:
> >
> > > Hi, all,
> > >
> > > I want to start the discussion on the release schedule for 0.10.0.
> There
> > > are a few important features that we plan to release in 0.10.0 and I
> want
> > > to start this thread s.t. we can agree on what to include in 0.10.0
> > > release.
> > >
> > > There are the following main features added in 0.10.0:
> > > - RocksDB TTL support
> > > - Add CoordinatorStream and disable CheckpointManager
> > > - Elasticsearch Producer
> > > - Host affinity
> > > And other 0.10.0 tickets:
> > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
> > >
> > > I propose to cut a 0.10.0 release after we get the following issues
> > > resolved:
> > > - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator
> > stream
> > > - SAMZA-617: YARN host affinity in Samza
> > >
> > > Thoughts?
> > >
> > > Thanks!
> > >
> > > -Yi
> > >
> >
>