Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

2016-08-25 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146813
---




samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
(line 29)
<https://reviews.apache.org/r/51346/#comment213511>

Minor: s/is/should not be



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 305)
<https://reviews.apache.org/r/51346/#comment213517>

I think we only need the size(). Is the underlying data structure mutable?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 542)
<https://reviews.apache.org/r/51346/#comment213530>

Would a simple countdown from initial #SSPs be enough here? We shouldn't 
get more than one EOS message per ssp.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 564)
<https://reviews.apache.org/r/51346/#comment213529>

Minor: move inside if block.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
62)
<https://reviews.apache.org/r/51346/#comment213532>

Minor: isEndOfStreamListenerTask.

Will make the usage at AsyncRunLoop:363 clearer.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
140)
<https://reviews.apache.org/r/51346/#comment213538>

Minor: Add documentation about what this is.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
354)
<https://reviews.apache.org/r/51346/#comment213533>

Just wondering, is 'checkX' our naming convention for methods with a check 
+ side effects?


- Prateek Maheshwari


On Aug. 24, 2016, 2:03 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> ---
> 
> (Updated Aug. 24, 2016, 2:03 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data 
> Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently works with unbounded data sources (kafka streams). However, 
> for bounded data sources like HDFS files, snapshot files which are not 
> infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once 
> data processing is complete.(as opposed to an infinite stream job that keeps 
> running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask 
> (Invariant: When end-of-stream is reached there are no buffered messages, 
> no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for 
> end-of-stream.
> 
> Design Doc and Implementation Notes: 
> https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -
> 
>   
> samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
> cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
>  a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> ---
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing 
> and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 51252: SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask

2016-08-30 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51252/#review147373
---




samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java (line 25)
<https://reviews.apache.org/r/51252/#comment214492>

multithreaded.

Maybe use asynchronous instead of multithreaded here? Multi-threaded seems 
like an implementation detail.



samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java (line 36)
<https://reviews.apache.org/r/51252/#comment214493>

s/serialized/serial?


- Prateek Maheshwari


On Aug. 30, 2016, 3:01 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51252/
> ---
> 
> (Updated Aug. 30, 2016, 3:01 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> 684ba0b77fca1c1c5fd8d740597d3b7fcdc0f5cb 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> f786fc08c8f7eced4f4084dc8326b28b6422 
> 
> Diff: https://reviews.apache.org/r/51252/diff/
> 
> 
> Testing
> ---
> 
> gradlew clean build.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 51252: SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask

2016-09-01 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51252/#review147583
---


Ship it!




While we're on the topic of logging: I found it useful to have a trace log for 
when the AsyncRunLoop gets unblocked in blockIfBusy. We already have one for 
when it blocks.

- Prateek Maheshwari


On Aug. 30, 2016, 3:01 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51252/
> ---
> 
> (Updated Aug. 30, 2016, 3:01 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> 684ba0b77fca1c1c5fd8d740597d3b7fcdc0f5cb 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> f786fc08c8f7eced4f4084dc8326b28b6422 
> 
> Diff: https://reviews.apache.org/r/51252/diff/
> 
> 
> Testing
> ---
> 
> gradlew clean build.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 51142: SAMZA-967: HDFS System Consumer

2016-09-29 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review150883
---




samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 
66)
<https://reviews.apache.org/r/51142/#comment218956>

"systems.%s.consumer.buffer-capacity" makes sense to me. Regarding the 
"hdfs" prefix, there's already inconsistency in current configs. The kafka 
system configs don't include kafka in the config name, but the hdfs producer 
configs do. The kafka convention is better IMHO.

Either way, we should at least be consistent between this and the new 
partitioner/reader configs which don't have the hdfs prefix.


- Prateek Maheshwari


On Sept. 28, 2016, 2:57 p.m., Hai Lu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> ---
> 
> (Updated Sept. 28, 2016, 2:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
> https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> Design doc can be found here: 
> https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf
> 
> An overview of the high level architecture: 
> 
> The system factory is used by Samza to instantiate SystemConsumer, 
> SystemProducer, and SystemAdmin for a specific system. The 
> FileDataSystemFactory can be reused for other file system like sources. 
> 
> HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of 
> HDFS files need to be consumed for this job. The DirectoryPartitioner also 
> uses “GroupingPattern” to group files into partitions if advanced 
> partitioning is required. HDFSSystemAdmin will then persist the 
> “PartitionDescriptor” to HDFS.
> 
> The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. 
> Based on this information as well as the actual assignment of partitions, it 
> would then know which files to read from.
> 
> The initial implementation of the HDFS system consumer supports only avro 
> data files. It’s very easy to extend it to a variety of file format by 
> implementing the FileReader interface.
> 
>   
> 
>  
> +--+
>  
>  |
>   | 
>+-+ HDFS   
>   | 
>|   Obtain|
>   | 
>|  Partition  
> +--+--^--+-^---+
>  
>| Description|  |  |   
>   | 
>||  |  |   
>   | 
>|  +-v---+  |  |   
> Filtering/| 
>|  | |  |  +---+
> Grouping +-+   
>|  | HDFSAvroFileReader  |  |  |   
> |   
>|  | |Persist   |  |   
> |   
>|  +-+---+   Partition  |  |   
> |   
>||  Description |   
> +--v--+ +--+--+
>

Re: Review Request 51142: SAMZA-967: HDFS System Consumer

2016-09-29 Thread Prateek Maheshwari


> On Sept. 29, 2016, 10:56 a.m., Prateek Maheshwari wrote:
> > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala, 
> > line 66
> > <https://reviews.apache.org/r/51142/diff/5/?file=1493810#file1493810line66>
> >
> > "systems.%s.consumer.buffer-capacity" makes sense to me. Regarding the 
> > "hdfs" prefix, there's already inconsistency in current configs. The kafka 
> > system configs don't include kafka in the config name, but the hdfs 
> > producer configs do. The kafka convention is better IMHO.
> > 
> > Either way, we should at least be consistent between this and the new 
> > partitioner/reader configs which don't have the hdfs prefix.

Btw, in the new configs we'll be using camelCase instead of dashes, so we'll 
eventually need to change it it to bufferCapacity.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review150883
---


On Sept. 28, 2016, 2:57 p.m., Hai Lu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> ---
> 
> (Updated Sept. 28, 2016, 2:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
> https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> Design doc can be found here: 
> https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf
> 
> An overview of the high level architecture: 
> 
> The system factory is used by Samza to instantiate SystemConsumer, 
> SystemProducer, and SystemAdmin for a specific system. The 
> FileDataSystemFactory can be reused for other file system like sources. 
> 
> HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of 
> HDFS files need to be consumed for this job. The DirectoryPartitioner also 
> uses “GroupingPattern” to group files into partitions if advanced 
> partitioning is required. HDFSSystemAdmin will then persist the 
> “PartitionDescriptor” to HDFS.
> 
> The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. 
> Based on this information as well as the actual assignment of partitions, it 
> would then know which files to read from.
> 
> The initial implementation of the HDFS system consumer supports only avro 
> data files. It’s very easy to extend it to a variety of file format by 
> implementing the FileReader interface.
> 
>   
> 
>  
> +--+
>  
>  |
>   | 
>+-+ HDFS   
>   | 
>|   Obtain|
>   | 
>|  Partition  
> +--+--^--+-^---+
>  
>| Description|  |  |   
>   | 
>||  |  |   
>   | 
>|  +-v---+  |  |   
> Filtering/| 
>|  | |  |  +---+
> Grouping +-+   
>|  | HDFSAvroFileReader  |  |  |   
> |   
>|  | |Persist   |  |   
> |   
>|  +-+---+   Partition  |  |   
> |   
>||  Descrip

Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-09-29 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150909
---




samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 65)
<https://reviews.apache.org/r/52403/#comment218979>

maybe firstCallbackException?
Don't think we need @volatile anymore.

Minor: Don't need type declaration.
Minor: update documentation.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 117)
<https://reviews.apache.org/r/52403/#comment218980>

Can you explain/document what we're trying to achieve here? Is this 
producer meant to somehow heal itself eventually and stop throwing this 
exception?

If that's the case doesn't that mean that "callbackExceptionFirstThrown" is 
a misnomer - it isn't the exception first thrown anymore.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 140)
<https://reviews.apache.org/r/52403/#comment218985>

Unrelated to this change: Strange formatting, move to previous line or use 
braces.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 152)
<https://reviews.apache.org/r/52403/#comment218986>

What does this retry refer to? Looks like we just close the producer in 
case of failure?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 156)
<https://reviews.apache.org/r/52403/#comment218983>

Minor: use logger.error for consistency.
Minor: capitalize "Closing"



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (lines 157 - 158)
<https://reviews.apache.org/r/52403/#comment218988>

If the producer is meant to heal itself eventually, is this still true?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 162)
<https://reviews.apache.org/r/52403/#comment218984>

Curious why we use format instead of Scala interpolated strings?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 202)
<https://reviews.apache.org/r/52403/#comment218982>

Wondering why we rethrow a previously saved exception instead of no-oping 
the flush? Is the user expected to handle this? If so, should document why & 
how.


- Prateek Maheshwari


On Sept. 29, 2016, 12:04 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 12:04 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-09-29 Thread Prateek Maheshwari


> On Sept. 29, 2016, 12:33 p.m., Prateek Maheshwari wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 202
> > <https://reviews.apache.org/r/52403/diff/1/?file=1516363#file1516363line202>
> >
> > Wondering why we rethrow a previously saved exception instead of 
> > no-oping the flush? Is the user expected to handle this? If so, should 
> > document why & how.

Nvm this, I think the intention is to throw in the next operation after a send 
error callback.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150909
---


On Sept. 29, 2016, 2 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 2 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-09-29 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150947
---




samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 101)
<https://reviews.apache.org/r/52403/#comment219040>

private def. Also, maybe move private method below public methods?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 106)
<https://reviews.apache.org/r/52403/#comment219047>

Nitpick: "Producer close ..." (capitalization)



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (lines 108 - 114)
<https://reviews.apache.org/r/52403/#comment219046>

Shouldn't this happen first? Otherwise a concurrent send could end up 
trying to use a closed producer.

Also, my impression was that creating producers is expensive. What impact 
does recycling producers like this have on the kafka cluster and on the 
container resource usage (per producer overhead)?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 165)
<https://reviews.apache.org/r/52403/#comment219043>

We can just remove this comment :)



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 170)
<https://reviews.apache.org/r/52403/#comment219048>

Comment redundant, same information in the line above.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 173)
<https://reviews.apache.org/r/52403/#comment219049>

"or producer.flush()"



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 218)
<https://reviews.apache.org/r/52403/#comment219042>

If the intention is to allow user to ensure that the container doesn't die 
on producer failures, shouldn't this not throw either? If we end up flushing 
right after the callback exception, the user will not get a chance to ignore 
the exception.


- Prateek Maheshwari


On Sept. 29, 2016, 2:23 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 2:23 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-09-29 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150951
---




samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (lines 60 - 61)
<https://reviews.apache.org/r/52403/#comment219055>

To me, this reads as: "We don't queue up more send requests from the Samza 
thread since we want to store the exception in case of ultimate send failure in 
the I/O thread.". But that doesn't make sense since we only stop queueing 
messages _after_ we see this value set. I think this is meant to be the other 
way around? I.e,

"exceptionInCallback: stores the exception in case of any "ultimate" send 
failure (ie. failure after exhausting max_retries in Kafka producer) in the I/O 
thread, so that we do not continue to queue up more send requests from the 
Samza thread."

The second sentence could also be: This helps the Samza thread know about 
I/O thread failures. (since w/o this field the Samza thread does not even know 
of a I/O failure, so the source is a moot point).


- Prateek Maheshwari


On Sept. 29, 2016, 2:23 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 2:23 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-17 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
---




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)
<https://reviews.apache.org/r/52476/#comment222170>

Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 
60 * 1000L



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
532)
<https://reviews.apache.org/r/52476/#comment222189>

Prefer passing the one config that we need explicitly instead of passing 
the config object.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 26)
<https://reviews.apache.org/r/52476/#comment222171>

Delete or import explicitly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 29)
<https://reviews.apache.org/r/52476/#comment222190>

Unrelated to RB but prefer explicit imports.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 71)
<https://reviews.apache.org/r/52476/#comment222173>

SystemClock exists so that you can pass a "Clock" to your method/class and 
mock it in tests. Let's either do that (preferred) or use 
System.currentTimeMillis() directly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 98)
<https://reviews.apache.org/r/52476/#comment222184>

Looks like we've updated `fileOffset` in `#readOffsetFile` as a side effect 
even when the store is stale. Is that what we want here?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 100)
<https://reviews.apache.org/r/52476/#comment222175>

Add an INFO message here.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 106)
<https://reviews.apache.org/r/52476/#comment222176>

Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 113)
<https://reviews.apache.org/r/52476/#comment222177>

Another case we ran into on Friday - if the oldest offset in the changelog 
topic is newer than the offset in the OFFSET file. Do you need to handle that 
here?

Nitpick: would isStaleStore be clearer?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 118)
<https://reviews.apache.org/r/52476/#comment222180>

Looks like this is already logged at line 163?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 119)
<https://reviews.apache.org/r/52476/#comment222179>

Don't `return` in scala code.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 121)
<https://reviews.apache.org/r/52476/#comment222181>

Mention somewhere in the message that this means that the store is stale.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 150)
<https://reviews.apache.org/r/52476/#comment222178>

I'd prefer to split this into two methods - existence check and file read. 
Would be even nicer if fileOffset was updated explicitly (after staleness 
checks etc.) and not as a side effect of reading the file.

If you don't, let's add return type to method signature.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 155)
<https://reviews.apache.org/r/52476/#comment222182>

Unrelated, but let's make this info.


- Prateek Maheshwari


On Oct. 17, 2016, 3:40 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 17, 2016, 3:40 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/Storag

Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-10-18 Thread Prateek Maheshwari


> On Sept. 29, 2016, 2:30 p.m., Prateek Maheshwari wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 220
> > <https://reviews.apache.org/r/52403/diff/2/?file=1516385#file1516385line220>
> >
> > If the intention is to allow user to ensure that the container doesn't 
> > die on producer failures, shouldn't this not throw either? If we end up 
> > flushing right after the callback exception, the user will not get a chance 
> > to ignore the exception.

Someone ran into this issue today.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150947
---


On Sept. 29, 2016, 2:23 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 2:23 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

2016-10-19 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
---

Review request for samza and Xinyu Liu.


Bugs: SAMZA-1017
https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description
---

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by SyncRunLoop, AsyncRunLoop and 
ThrottlingExecutor
Adds a RunLoop interface, implemented by SyncRunLoop (formerly RunLoop) and 
AsyncRunLoop. 
When AsyncRunLoop is throttled, it delays the onComplete() callback from 
processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not 
throttle the run loop as long as task processing rate > message throughput. 
E.g., a low QPS stream with process() time < message inter-arrival time. If 
desirable, this can be addressed by delaying based on the total run loop time 
instead of just the process() time.

2. If throttled, users can increase their throughput back to original by 
increasing task.max.concurrency and redeploying their jobs. I don't have a 
simple solution for this, suggestions are welcome.


Diffs
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
a789d04 
  
samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913de 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
---

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari



Re: Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

2016-10-19 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
---

(Updated Oct. 19, 2016, 10:54 a.m.)


Review request for samza and Xinyu Liu.


Changes
---

Updated description.


Bugs: SAMZA-1017
https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description (updated)
---

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by RunLoop, AsyncRunLoop and 
ThrottlingExecutor
When AsyncRunLoop is throttled, it delays the onComplete() callback from 
processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not 
throttle the run loop as long as task processing rate > message throughput. 
E.g., a low QPS stream with process() time < message inter-arrival time. If 
desirable, this can be addressed by delaying based on the total run loop time 
instead of just the process() time.

2. If throttled, users can increase their throughput back to original by 
increasing task.max.concurrency and redeploying their jobs. I don't have a 
simple solution for this, suggestions are welcome.


Diffs
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
a789d04 
  
samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913de 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
---

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari



Re: Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

2016-10-19 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
---

(Updated Oct. 19, 2016, 11:30 a.m.)


Review request for samza and Xinyu Liu.


Changes
---

Rebased from HEAD.


Bugs: SAMZA-1017
https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description
---

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by RunLoop, AsyncRunLoop and 
ThrottlingExecutor
When AsyncRunLoop is throttled, it delays the onComplete() callback from 
processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not 
throttle the run loop as long as task processing rate > message throughput. 
E.g., a low QPS stream with process() time < message inter-arrival time. If 
desirable, this can be addressed by delaying based on the total run loop time 
instead of just the process() time.

2. If throttled, users can increase their throughput back to original by 
increasing task.max.concurrency and redeploying their jobs. I don't have a 
simple solution for this, suggestions are welcome.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
a789d04 
  
samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 77eceea 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 3263e54 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
---

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-20 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
---



Looks pretty good, few final comments.


samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 
253)
<https://reviews.apache.org/r/52476/#comment222805>

Does jobConfig.getChangeLog...() (implicit conversion) not work?



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57)
<https://reviews.apache.org/r/52476/#comment222811>

getChangeLogDeleteRetentionsInMs



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58)
<https://reviews.apache.org/r/52476/#comment222812>

See if you can use the named operator instead of the symbolic operator.

I think you might be able to .toMap on the list of pairs.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 104)
<https://reviews.apache.org/r/52476/#comment222827>

Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 106)
<https://reviews.apache.org/r/52476/#comment222821>

If loggedStoreDir isn't present we put null into fileOffset. If that's the 
expected behavior, let's log at info in isStateLoggedStore if 
(!loggedStoreDir.exists())



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 108)
<https://reviews.apache.org/r/52476/#comment222815>

Misleading comment, could be the other condition too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 111)
<https://reviews.apache.org/r/52476/#comment222818>

Will be useful to log at info the read file offset  here (or in 
readOffsetFile)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 112)
<https://reviews.apache.org/r/52476/#comment222813>

Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 118)
<https://reviews.apache.org/r/52476/#comment222819>

Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 130)
<https://reviews.apache.org/r/52476/#comment222824>

First %s should be store name. Add another %s at the end for loggedStoreDir.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)
<https://reviews.apache.org/r/52476/#comment222822>

log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)
<https://reviews.apache.org/r/52476/#comment222823>

Log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 182)
<https://reviews.apache.org/r/52476/#comment222826>

s/partition/logged storage partition to be consistent with next message.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144)
<https://reviews.apache.org/r/52476/#comment222830>

implicit conversion should probably work.


- Prateek Maheshwari


On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 19, 2016, 3:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558

Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-24 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
---


Ship it!




Looks good to me, thanks.


samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment223156>

s/CHANGE_LOG/CHANGELOG



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 138)
<https://reviews.apache.org/r/52476/#comment223158>

s/is greater than/is older than.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-02 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154602
---




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 588)
<https://reviews.apache.org/r/53282/#comment224235>

Typo: Envelope



samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java (line 
79)
<https://reviews.apache.org/r/53282/#comment224236>

This would probably be good to have as info.



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)
<https://reviews.apache.org/r/53282/#comment224238>

Minor: s/completeCallbacks/completedCallbacks?



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)
<https://reviews.apache.org/r/53282/#comment224239>

Minor: s/completeCallbacks/completedCallbacks?



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
95)
<https://reviews.apache.org/r/53282/#comment224237>

Minor: Is this within the max line width?



samza-core/src/main/java/org/apache/samza/util/TimerClock.java (line 25)
<https://reviews.apache.org/r/53282/#comment224214>

Can we just add this method to the existing Clock interface? Weird to have 
two clock interfaces.



samza-core/src/main/java/org/apache/samza/util/Utils.java (line 38)
<https://reviews.apache.org/r/53282/#comment224240>

Prefer not having a util class for this one method, which I think is only 
used in one place? Also don't think this is worth having a util method for.



samza-core/src/main/java/org/apache/samza/util/Utils.java (line 38)
<https://reviews.apache.org/r/53282/#comment224241>





samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 32)
<https://reviews.apache.org/r/53282/#comment224242>

Minor: s/timer/timers (same for field name)



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 63)
<https://reviews.apache.org/r/53282/#comment224245>

Minor: s/using/use



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 66)
<https://reviews.apache.org/r/53282/#comment224248>

Config objects shouldn't return actual instances. It should just return the 
config value and call site should create the actual/no-op clock based on it and 
pass it around to whoever needs it.



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 68)
<https://reviews.apache.org/r/53282/#comment224244>

Don't need `return` here (and later).



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 72)
<https://reviews.apache.org/r/53282/#comment224243>

Minor: Move to previous line.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
65)
<https://reviews.apache.org/r/53282/#comment224246>

I don't think we should ever be creating a default new SerdeManager just 
for this class. Same for SystemConsumerMetrics (and other places where this 
pattern is used for objects). Constants are fine.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
108)
<https://reviews.apache.org/r/53282/#comment224247>

Same here, not sure if we should use the default.



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 397)
<https://reviews.apache.org/r/53282/#comment224215>

This seems like a weird method to have. Would prefer to remove.



samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 (line 137)
<https://reviews.apache.org/r/53282/#comment224249>

See comment about passing this in instead.


- Prateek Maheshwari


On Nov. 2, 2016, 10:56 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 10:56 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasi

Re: Review Request 52168: Tasks endpoint to list the complete details of all tasks related to a job

2016-11-02 Thread Prateek Maheshwari
e 
to clarify what this is.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 98)
<https://reviews.apache.org/r/52168/#comment224326>

Extract val for metadata cache, systemAdmins.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 99 - 102)
<https://reviews.apache.org/r/52168/#comment224340>

Not required for the other place they're used?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 109)
<https://reviews.apache.org/r/52168/#comment224333>

Can move to previous line. Also pass MetricsRegistry.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 110)
<https://reviews.apache.org/r/52168/#comment224334>

Should probably be a class val (constant).



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 110 - 123)
<https://reviews.apache.org/r/52168/#comment224357>

Can't tell in RB: could this be replaced by retrieveJobModel()?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 231)
<https://reviews.apache.org/r/52168/#comment224335>

Feels like we're logging this in multiple places. Just make sure we're not 
double logging this.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 250)
<https://reviews.apache.org/r/52168/#comment224336>

Don't need intermediate val.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 277)
<https://reviews.apache.org/r/52168/#comment224338>

Unrelated to RB: mutable.Map instead of util.HashMap?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 307)
<https://reviews.apache.org/r/52168/#comment224339>

Previous line.



samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java (line 27)
<https://reviews.apache.org/r/52168/#comment224341>

"partition id"



samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
 (line 51)
<https://reviews.apache.org/r/52168/#comment224344>

config.getConfigJobConfigFactory?



samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
 (line 40)
<https://reviews.apache.org/r/52168/#comment224349>

Is the CONFIG_ prefix a samza-rest convention?


- Prateek Maheshwari


On Nov. 2, 2016, 5:58 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52168/
> ---
> 
> (Updated Nov. 2, 2016, 5:58 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch contains the following changes
>  * Http get api to list the complete details of all the tasks that belongs to 
> a job. 
>  * Refactored some methods in coordinator stream, to reuse the existing 
> functionality of getting jobConfig from the coordinator stream.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/resource-directory.md 
> 79746d1e2eb3491e4bd26c3c7cf6c7efd150d8ef 
>   docs/learn/documentation/versioned/rest/resources/tasks.md PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 13b72fae7815ddaea7ae03a24f1a426ca51613cc 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> df63b97e9d598ecd1840111ba490a723e410d089 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 022b480856483059cb9f837a08f97a718bc04c31 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  fcabc69a829fd26b7f4e422d9877ec0364d308ce 
>   samza-rest/src/main/config/samza-rest.properties 
> 7be0b47d1466d2199ae278247e8d81522fb6a91c 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java 
> PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Task.java PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
>  4d8647f3e1e650632e38b47ba5a8a2dac004f545 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java 
> 067711a74e5b0d7277a9c8b2d2517b56e9cfbcca 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
>  a935c98730f85f448c688a6baf2e8ddffdbb2cb4 
>   
> samza-rest/src/main/java/org/apache/samza

Re: Review Request 52168: Tasks endpoint to list the complete details of all tasks related to a job

2016-11-04 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52168/#review154832
---




samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 218)
<https://reviews.apache.org/r/52168/#comment224555>

    


- Prateek Maheshwari


On Nov. 3, 2016, 6:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52168/
> ---
> 
> (Updated Nov. 3, 2016, 6:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch contains the following changes
>  * Http get api to list the complete details of all the tasks that belongs to 
> a job. 
>  * Refactored some methods in coordinator stream, to reuse the existing 
> functionality of getting jobConfig from the coordinator stream.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/resource-directory.md 
> 79746d1e2eb3491e4bd26c3c7cf6c7efd150d8ef 
>   docs/learn/documentation/versioned/rest/resources/tasks.md PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 13b72fae7815ddaea7ae03a24f1a426ca51613cc 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> df63b97e9d598ecd1840111ba490a723e410d089 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 022b480856483059cb9f837a08f97a718bc04c31 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  fcabc69a829fd26b7f4e422d9877ec0364d308ce 
>   samza-rest/src/main/config/samza-rest.properties 
> 7be0b47d1466d2199ae278247e8d81522fb6a91c 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java 
> PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Task.java PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
>  4d8647f3e1e650632e38b47ba5a8a2dac004f545 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java 
> 067711a74e5b0d7277a9c8b2d2517b56e9cfbcca 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
>  a935c98730f85f448c688a6baf2e8ddffdbb2cb4 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
>  11d93d4608d23a4e3fb3bfc50dfac35ab6dbdf3c 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java 
> PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
>  PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java 
> PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
>  e0224c6bcf4aeaa336e5786ac472482507fcd382 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java 
> a566db598c284d69ea61af88fdc0851483d5a089 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
>  527482d2ee55747e7b3f9c54c8a3b1afe7ad8797 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java 
> PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java 
> PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
>  7db437b348ecd286185898b8f8ab0220d59da71a 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/52168/diff/
> 
> 
> Testing
> ---
> 
> Manual and unit testing has been done to verify the apis.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52168: Tasks endpoint to list the complete details of all tasks related to a job

2016-11-04 Thread Prateek Maheshwari


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > docs/learn/documentation/versioned/rest/resources/tasks.md, line 55
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552720#file1552720line55>
> >
> > What's the difference b/w containerId and containerName?
> 
> Shanthoosh Venkataraman wrote:
> Container Name is the unique name that identifies a container within a 
> job. Exposing this information is useful when killing containers(performing 
> related admin actions on it.). Currently container name is generated 
> prefixing container id with a string.

I don't think adding a separate container name concept is necessary. The 
containerId is sufficient to uniquely identify the container. The 
"samza-container" prefix makes sense in the MDC for disambiguation in the log 
lines, but not sure what it buys us here.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > docs/learn/documentation/versioned/rest/resources/tasks.md, line 79
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552720#file1552720line79>
> >
> > What's a job instance? If you're referring to i001, that's LI 
> > terminology.
> > 
> > "as an argument"
> 
> Shanthoosh Venkataraman wrote:
> Job instance here means (jobId, jobName) tuple as a error message. This 
> is used consistently in samza-rest services for this particular type of error 
> messages.

Makes sense, thanks.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 110-123
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552723#file1552723line110>
> >
> > Can't tell in RB: could this be replaced by retrieveJobModel()?
> 
> Shanthoosh Venkataraman wrote:
> There are lot of state changes that has to be done after 
> retrieveJobModel. LocalityManager, changeLogManager are required for that. 
> This refactoring would make it unclean.

Makes sense, thanks.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 309
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552723#file1552723line309>
> >
> > Unrelated to RB: mutable.Map instead of util.HashMap?
> 
> Shanthoosh Venkataraman wrote:
> I would really like to make this change, however, in this class, 
> everywhere util.Map, util.Set has been used extensively. If i just change 
> here alone to collections.mutable.mao, it will look incoherent and changing 
> at every other place would be a harder effort.

Thanks for checking, let's do it later.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 53-54
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552723#file1552723line53>
> >
> > Unrelated to RB: Why does this use both? One of these is deprecated.

For context, we decided to punt on this for now since it's not a trivial 
refactor.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 104
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552722#file1552722line104>
> >
> > Don't think we need a Util method for a string concat.
> 
> Shanthoosh Venkataraman wrote:
> Container name is used in multiple places(SamzaContainer and 
> TasksResource). The way in which the container name is constructed from 
> container id could change in the future, hence this was added just to 
> encapsulate that here.

See comment above.


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  lines 99-102
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552723#file1552723line99>
> >
> > Not required for the other place they're used?
> 
> Shanthoosh Venkataraman wrote:
> Yes.

"Yes, it's required" or "Yes, it's not required"? If the latter, why?


> On Nov. 2, 2016, 11:32 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 252
> > <https://reviews.apache.org/r/52168/diff/8/?file=1552723#file1552723line252>
> >
> > Feels like we're logging this in multiple places. Just make sure we're 
> > not double logging this.
> 
> Shanthoosh Venkataraman wrote:
> Each of

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Prateek Maheshwari


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> >

Sorry for the late reply, didn't get an email notification for your replies.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 65
> > <https://reviews.apache.org/r/53282/diff/1/?file=1548713#file1548713line65>
> >
> > I don't think we should ever be creating a default new SerdeManager 
> > just for this class. Same for SystemConsumerMetrics (and other places where 
> > this pattern is used for objects). Constants are fine.
> 
> Xinyu Liu wrote:
> I think these are created as default for the testing purpose. Normally we 
> will pass in the real SerdeManager.

I'd still argue that we shouldn't do this.
1. It makes it possible to accidentally forget passing objects which are 
actually necessary for this class to be functional.
2. Looking at the signature of these constructors, there's no indication 
whether the field is really required or optional.
2. We rely on this pattern in other places for automatically creating objects. 
It makes looking up what classes are being used where more difficult.

The only benefit is saving a few characters when instantiating in tests. Would 
strongly prefer always passing objects explicitly.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 397
> > <https://reviews.apache.org/r/53282/diff/1/?file=1548714#file1548714line397>
> >
> > This seems like a weird method to have. Would prefer to remove.
> 
> Xinyu Liu wrote:
> This makes a lot easier to convert a Java TimerClock object to a scala 
> function to return the time. I agree it's not pretty, but there has to be a 
> workaround for converting this right now.

Can we pass TimerClock to Scala classes too? Should be cleaner.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25
> > <https://reviews.apache.org/r/53282/diff/1/?file=1548709#file1548709line25>
> >
> > Can we just add this method to the existing Clock interface? Weird to 
> > have two clock interfaces.
> 
> Xinyu Liu wrote:
> Chris raised the same question. The other interface has an extra method I 
> don't need, plus I want to use lamdba for this. I think I can make the 
> HighResolutionClock extends from this one. Does that sounds better?

As you mentioned earlier, moving the other method in HighResolutionClock to the 
calling class and use it instead makes sense.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line 
> > 33
> > <https://reviews.apache.org/r/53282/diff/1/?file=1548711#file1548711line33>
> >
> > Minor: s/timer/timers (same for field name)
> 
> Xinyu Liu wrote:
> I put this config to mean all timer metrics are turned on/off. So I guess 
> this config name should be fine.

Plural seems more appropriate grammatically since this applies to all timers.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154602
---


On Nov. 7, 2016, 4:47 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 7, 2016, 4:47 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> usef

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review155376
---




samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
155)
<https://reviews.apache.org/r/53282/#comment225244>

Maybe 'timer' or 'highResolutionClock' instead, since clock can be confused 
for currentTimeMillis? As Jagadish pointed out in an unrelated context, they're 
not equivalent since one returns epoch time and the other is only useful for 
time differences.



samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 (line 138)
<https://reviews.apache.org/r/53282/#comment225247>

Only tangentially related to RB, but this feels wrong in terms of config 
(and code) dependencies: StorageEngine is reading the Metrics configuration out 
of the entire container context, so that it can choose what clock the Timer 
should use.

Haven't thought this through, but maybe we should refactor things to move 
the clock to the Timer class instead?


- Prateek Maheshwari


On Nov. 7, 2016, 4:47 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 7, 2016, 4:47 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   samza-core/sr

Re: Review Request 53453: Add optional interface for SystemConsumer checkpontListener() for checkpoint notifications

2016-11-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53453/#review155377
---



Some minor code style/documentation related comments. One correctness question.


samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java 
(line 25)
<https://reviews.apache.org/r/53453/#comment225248>

No comma after SystemConsumers



samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java (line 134)
<https://reviews.apache.org/r/53453/#comment225249>

s/checkpoint/register the same offset.

Also, maintain previous line width for comment.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
77)
<https://reviews.apache.org/r/53453/#comment225250>

Strongly prefer not adding empty Map() as default value here (and maybe 
clean up the other one too). See my comment on Xinyu's HDFS performance RB for 
explanation.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
146)
<https://reviews.apache.org/r/53453/#comment225252>

s/ones/one



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
239)
<https://reviews.apache.org/r/53453/#comment225255>

Are you missing a foreach here?

I think you need something like:
lastProcessedOffsets.get(taskName)
  .foreach { sspToOffsets => sspToOffsets.foreach { case (ssp, checkpoint) 
=> offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } }

If the version above is correct, can we add a test for this? Its easy to 
miss.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
243)
<https://reviews.apache.org/r/53453/#comment225257>

Can remove comment, same information in next line.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
244)
<https://reviews.apache.org/r/53453/#comment225258>

Space after 'case' and after ':'



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
245)
<https://reviews.apache.org/r/53453/#comment225259>

s/is an empty list/is empty



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
361)
<https://reviews.apache.org/r/53453/#comment225260>

Indent by 2
Space before and after =>
Space b/w map and {



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
366)
<https://reviews.apache.org/r/53453/#comment225261>

Indent by 2.


- Prateek Maheshwari


On Nov. 4, 2016, 4:23 p.m., Boris Shkolnik wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53453/
> ---
> 
> (Updated Nov. 4, 2016, 4:23 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-1042
> https://issues.apache.org/jira/browse/SAMZA-1042
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Add optional interface for SystemConsumer checkpontListener() for checkpoint 
> notifications.
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java 
> 8dfcc7499659442aabd3085a8787475fe38f29db 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> c41eadb70f4675816245f7ac40f0db2fc16335f0 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> cb78223f1b59a78bbeb1e42b5974670a53def504 
> 
> Diff: https://reviews.apache.org/r/53453/diff/
> 
> 
> Testing
> ---
> 
> gradlew test.
> manual testing.
> 
> 
> Thanks,
> 
> Boris Shkolnik
> 
>



Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155380
---




docs/learn/documentation/versioned/rest/monitors.md (line 100)
<https://reviews.apache.org/r/53297/#comment225263>

This section could be more concise.



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
36)
<https://reviews.apache.org/r/53297/#comment225264>

Minor: private constructors for helper classes are pretty universal, don't 
think they need a comment.



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
40)
<https://reviews.apache.org/r/53297/#comment225265>

This javadoc doesn't add any more information than the method signature 
already provides. Prefer removing.



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
45)
<https://reviews.apache.org/r/53297/#comment225266>

Not sure this specific method (for classloading this particular class from 
configs) deserves a new Util class. This is a general pattern used in other 
places, maybe we should extract that as a util instead.

May be worth revisiting if/how we want to share code b/w samza-rest and 
samza-core. Ideally samza-rest shouldn't depend on samza-core, and shared stuff 
should be pulled out to samza-api or samza-common (which doesn't exist yet).



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java (line 55)
<https://reviews.apache.org/r/53297/#comment225267>

What is this config, and what does container refer to here? The samza-rest 
service container?


- Prateek Maheshwari


On Nov. 8, 2016, 3:13 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 8, 2016, 3:13 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java 
> 47b0663637f6db187d86961377ee3ee203b73fdb 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-09 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review155478
---


Ship it!




Looks pretty good, thanks!


samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java (line 
134)
<https://reviews.apache.org/r/53282/#comment225446>

Maybe make private if not being tested.



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)
<https://reviews.apache.org/r/53282/#comment225447>

Unused import?



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)
<https://reviews.apache.org/r/53282/#comment225448>





samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)
<https://reviews.apache.org/r/53282/#comment225449>





samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
155)
<https://reviews.apache.org/r/53282/#comment225452>





samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 393)
<https://reviews.apache.org/r/53282/#comment225450>





samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 393)
<https://reviews.apache.org/r/53282/#comment225451>

s/TimerClock/HighResolutionClock


- Prateek Maheshwari


On Nov. 9, 2016, 11:10 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 9, 2016, 11:10 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
> 69ba441ed087305dfe4e1272b00fad67b644e13f 
>   
> samza-core/src/main/j

Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-09 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155511
---




samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java (line 55)
<https://reviews.apache.org/r/53297/#comment225485>

What's the execution unit here? The samza-rest server? The monitor? The 
container being monitored?


- Prateek Maheshwari


On Nov. 9, 2016, 2:50 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 9, 2016, 2:50 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   docs/learn/documentation/versioned/rest/overview.md 
> c382f032843cce696a445ff110e87a8198cc96d7 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java 
> 47b0663637f6db187d86961377ee3ee203b73fdb 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-09 Thread Prateek Maheshwari


> On Nov. 5, 2016, 11:47 a.m., Jake Maes wrote:
> > samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java, line 
> > 75
> > 
> >
> > I don't think the MetricsConfig constructure takes a subset. 
> > 
> > I think it takes the root and expects to find the "metrics" prefix
> 
> Jake Maes wrote:
> s/constructure/constructor
> 
> phonetic brain fail
> 
> Shanthoosh Venkataraman wrote:
> Yes, that's true. It expects the root and finds the metrics prefix. 
> Hence, stripPrefix is passed on as false, so that prefix isn't removed. This 
> just selects the config subset that starts with METRICS_PREFIX, without 
> removing the prefix. The goal is to not pass on the entire config object and 
> pass only metrics related config into MetricsConfig constructor.

Regarding the goal: That's not how the XConfig classes are intended to be used, 
so it's not really necessary to do this. For example, see the implicit 
conversion b/w Config and XConfig. Prefer passing in config.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155061
---


On Nov. 9, 2016, 2:50 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 9, 2016, 2:50 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   docs/learn/documentation/versioned/rest/overview.md 
> c382f032843cce696a445ff110e87a8198cc96d7 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java 
> 47b0663637f6db187d86961377ee3ee203b73fdb 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-10 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155625
---



Looks pretty good to me. Some final minor comments and questions.


docs/learn/documentation/versioned/rest/monitors.md (line 98)
<https://reviews.apache.org/r/53297/#comment225624>

Maybe: "are the same as that of Samza jobs." and make "that of Samza jobs" 
the configuration link and remove next line?



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
30)
<https://reviews.apache.org/r/53297/#comment225627>

Remove "contains reusable methods which" and "based upon configuration". 
Don't mind removing the whole comment either.



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
39)
<https://reviews.apache.org/r/53297/#comment225628>

I meant remove the whole javadoc including @params. There's no useful 
additional information here.



samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java (line 
52)
<https://reviews.apache.org/r/53297/#comment225632>

Minor, your call: Not a big fan of the "hanging towers of parameters" code 
style. Maybe extract variable?



samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 (line 45)
<https://reviews.apache.org/r/53297/#comment225629>

Maybe use JavaCoverters._ and .asScala?



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java (line 27)
<https://reviews.apache.org/r/53297/#comment225630>

Unused import.



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java (line 91)
<https://reviews.apache.org/r/53297/#comment225631>

See previous comment about just passing the entire config. Does that not 
work?



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java (line 103)
<https://reviews.apache.org/r/53297/#comment225638>

Don't understand this interface. What's a SchedulingProvider, and how is it 
different than a ScheduledExecutorService?

schedulingProvider should be probably be stopped in stop() in this class 
and not in SamzaMonitorService since this class owns it. Either that, or move 
instantiation to SamzaMonitorService too.



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java (line 107)
<https://reviews.apache.org/r/53297/#comment225636>

Should this be called during start()? Same for stop().



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java (line 164)
<https://reviews.apache.org/r/53297/#comment225633>

debug?



samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java (line 166)
<https://reviews.apache.org/r/53297/#comment225634>

debug?



samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 
(line 46)
<https://reviews.apache.org/r/53297/#comment225641>

Maybe use JavaCoverters._ and .asScala?


- Prateek Maheshwari


On Nov. 9, 2016, 5:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 9, 2016, 5:04 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java 
> 47b0663637f6db187d86961377ee3ee203b73fdb 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-10 Thread Prateek Maheshwari


> On Nov. 10, 2016, 10:57 a.m., Prateek Maheshwari wrote:
> > samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java, line 
> > 103
> > <https://reviews.apache.org/r/53297/diff/5/?file=1560180#file1560180line103>
> >
> > Don't understand this interface. What's a SchedulingProvider, and how 
> > is it different than a ScheduledExecutorService?
> > 
> > schedulingProvider should be probably be stopped in stop() in this 
> > class and not in SamzaMonitorService since this class owns it. Either that, 
> > or move instantiation to SamzaMonitorService too.
> 
> Shanthoosh Venkataraman wrote:
> SchedulingProvider interface looks similar to that of ExecutorService. 
> Not sure of the value add this interface provides. I guess, this is not 
> required and we could just use ExeuctorService. Added a jira for it, since 
> it's not in the scope of this patch. We could re-visit it later.

Sounds good, thanks.


> On Nov. 10, 2016, 10:57 a.m., Prateek Maheshwari wrote:
> > samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java, line 
> > 107
> > <https://reviews.apache.org/r/53297/diff/5/?file=1560180#file1560180line107>
> >
> > Should this be called during start()? Same for stop().
> 
> Shanthoosh Venkataraman wrote:
> To accomplish this, both(SamzaRestService,SamzaMonitorService) of them 
> had to be coupled. Logically they are seperate entities. One responsible for 
> resources and other monitors. Currently MonitorService & RestService are not 
> coupled with each other. It would be useful keep them seperate(In the future, 
> we could have SamzaRestService and SamzaMonitorService deployed seperately 
> when we have too many(Monitors & Resources) in them). I would prefer not to 
> have this coupling.

Thanks for the explanation, makes sense to separate if they're independent.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155625
---


On Nov. 10, 2016, 4:22 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 10, 2016, 4:22 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53297: Initial version of adding metrics into samza rest.

2016-11-10 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53297/#review155689
---


Ship it!




Ship It!

- Prateek Maheshwari


On Nov. 10, 2016, 4:22 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53297/
> ---
> 
> (Updated Nov. 10, 2016, 4:22 p.m.)
> 
> 
> Review request for samza and Jake Maes.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch aims at enabling users to define custom reporters to send metrics 
> from the monitors. Configurations required for the definition of the metrics 
> reporters follows the same convention as of the samza jobs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/monitors.md 
> 46678bbe5fed99f767c3324dc9578ee1a64cec66 
>   samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   
> samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
>  f24beb1e099dd44b15b475e0a4a7f70560c6965e 
>   samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
> 2a3e83a24a5343bb53b93fc9d0a647c1b253714b 
>   samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  8a5b4aaea6e11a5af999f12d50e5b6135dbc70ca 
> 
> Diff: https://reviews.apache.org/r/53297/diff/
> 
> 
> Testing
> ---
> 
> Unit tests are done to verify the intended functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 53826: added user document for Checkpoint callbacks

2016-11-17 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53826/#review156279
---




docs/learn/documentation/versioned/container/checkpointing.md (line 126)
<https://reviews.apache.org/r/53826/#comment226477>

Somewhat redundant with previous sentence, can merge.



docs/learn/documentation/versioned/container/checkpointing.md (line 128)
<https://reviews.apache.org/r/53826/#comment226478>

"One such use case"

"High Level Kafka Consumer", since that's how Kafka refers to it.



docs/learn/documentation/versioned/container/checkpointing.md (line 129)
<https://reviews.apache.org/r/53826/#comment226479>

"rely on ACKs"

"An example would be"

"can only be done by the consumer"



docs/learn/documentation/versioned/container/checkpointing.md (line 132)
<https://reviews.apache.org/r/53826/#comment226480>

"implement the"



docs/learn/documentation/versioned/container/checkpointing.md (line 138)
<https://reviews.apache.org/r/53826/#comment226481>

Should we clarify what the 'offsets' parameter being passed to this 
callback and the checkpointing behavior is? Specifically, that Samza will still 
save all offsets it knows about in the checkpoint, notify listeners about all 
of them, and (on restart) attempt to `seek()` from the checkpointed offset for 
all systems?

"Note: The" or "Note that the"


- Prateek Maheshwari


On Nov. 16, 2016, 3:28 p.m., Boris Shkolnik wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53826/
> ---
> 
> (Updated Nov. 16, 2016, 3:28 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-1046
> https://issues.apache.org/jira/browse/SAMZA-1046
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> added user documentation for Checkpoint callbacks
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/container/checkpointing.md 
> 6f8c6d694be92f973af456ddd518d70540abe5c3 
> 
> Diff: https://reviews.apache.org/r/53826/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Boris Shkolnik
> 
>



Re: Review Request 53826: added user document for Checkpoint callbacks

2016-11-22 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53826/#review156607
---


Ship it!




Looks good. Minor comments.


docs/learn/documentation/versioned/container/checkpointing.md (line 126)
<https://reviews.apache.org/r/53826/#comment226779>

Can be moved to previous line.



docs/learn/documentation/versioned/container/checkpointing.md (line 129)
<https://reviews.apache.org/r/53826/#comment226780>

What does the "push system" mean? Maybe clarify/remove?



docs/learn/documentation/versioned/container/checkpointing.md (line 138)
<https://reviews.apache.org/r/53826/#comment226781>

Multiple "and"s in the sentence. Maybe "... for a task. These are the ..."?


- Prateek Maheshwari


On Nov. 18, 2016, 4:52 p.m., Boris Shkolnik wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53826/
> ---
> 
> (Updated Nov. 18, 2016, 4:52 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-1046
> https://issues.apache.org/jira/browse/SAMZA-1046
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> added user documentation for Checkpoint callbacks
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/container/checkpointing.md 
> 6f8c6d694be92f973af456ddd518d70540abe5c3 
> 
> Diff: https://reviews.apache.org/r/53826/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Boris Shkolnik
> 
>



Re: Review Request 54020: Operator API refactoring

2016-11-28 Thread Prateek Maheshwari
 
  
samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java 
44efa6dc8dd828c5c111dd1979e74a13702ffd91 
  samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
de7bba5a1745760ef80f060c8131a60dcc069871 

Diff: https://reviews.apache.org/r/54020/diff/


Testing
---

./gradlew clean build works.


Thanks,

Prateek Maheshwari



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2017-01-25 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
---




samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 97)
<https://reviews.apache.org/r/52476/#comment234449>

"Got default storage ..."



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 137)
<https://reviews.apache.org/r/52476/#comment234451>

No space before ":" here and elsewhere, including method type annotations 
(e.g. line 169, 185).



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 152)
<https://reviews.apache.org/r/52476/#comment234450>

We should log which directory we're using for the store here at INFO.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

2017-02-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164749
---




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment236506>

Doesn't look like this is fixed? Did you miss updating with a patch?


- Prateek Maheshwari


On Feb. 8, 2017, 11:09 a.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Feb. 8, 2017, 11:09 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

2017-02-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164786
---


Fix it, then Ship it!




LGTM, thanks. Few code style/documentation related comments.


samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 107)
<https://reviews.apache.org/r/52476/#comment236555>

Can delete, doesn't describe the entire block, and unnecessary for the 
first part.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 113)
<https://reviews.apache.org/r/52476/#comment236557>

s/of the store/for the store.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 120)
<https://reviews.apache.org/r/52476/#comment236554>

@param doesn't go in the method description, use {@code}



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 135)
<https://reviews.apache.org/r/52476/#comment236559>

Explain what stale means here.

Use @code instead of @param here.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 138)
<https://reviews.apache.org/r/52476/#comment236561>

Can just say "true if the store is stale". Description of what stale means 
should go in the method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 159)
<https://reviews.apache.org/r/52476/#comment236562>

No comma before 'if' if it's the last clause in the sentence.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 196)
<https://reviews.apache.org/r/52476/#comment236565>

s/in the store/for the store.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 198)
<https://reviews.apache.org/r/52476/#comment236563>

No space before colon, here and other places in this file.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 202)
<https://reviews.apache.org/r/52476/#comment236564>

Thanks!


- Prateek Maheshwari


On Feb. 8, 2017, 1:37 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Feb. 8, 2017, 1:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52168: Tasks endpoint to list the complete details of all tasks related to a job

2017-02-23 Thread Prateek Maheshwari
 that we've documented? Bad 
request is usually a 400, not a 404. Does it mean a 404 in javax.ws?



samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java (line 
44)
<https://reviews.apache.org/r/52168/#comment238532>

"Constructs a bad request (HTTP 400) response"



samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java 
(line 40)
<https://reviews.apache.org/r/52168/#comment238534>

If this resource is always for a particular job, should 
"/{jobName}/{jobId}" shouldn't go here instead of specific methods? Is that not 
allowed in javax.ws?



samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java 
(line 54)
<https://reviews.apache.org/r/52168/#comment238535>

This is assuming a particular implementation, what's the factory for in 
this case?



samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java 
(line 58)
<https://reviews.apache.org/r/52168/#comment238418>

Missing description.


- Prateek Maheshwari


On Feb. 15, 2017, 10:26 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52168/
> ---
> 
> (Updated Feb. 15, 2017, 10:26 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This patch contains the following changes
>  * Http get api to list the complete details of all the tasks that belongs to 
> a job. 
>  * Refactored some methods in coordinator stream, to reuse the existing 
> functionality of getting jobConfig from the coordinator stream.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/rest/resource-directory.md 
> 79746d1e2eb3491e4bd26c3c7cf6c7efd150d8ef 
>   docs/learn/documentation/versioned/rest/resources/tasks.md PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> df63b97e9d598ecd1840111ba490a723e410d089 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 022b480856483059cb9f837a08f97a718bc04c31 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   samza-rest/src/main/config/samza-rest.properties 
> 7be0b47d1466d2199ae278247e8d81522fb6a91c 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java 
> PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/model/Task.java PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
>  4d8647f3e1e650632e38b47ba5a8a2dac004f545 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java 
> 067711a74e5b0d7277a9c8b2d2517b56e9cfbcca 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
>  a935c98730f85f448c688a6baf2e8ddffdbb2cb4 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
>  11d93d4608d23a4e3fb3bfc50dfac35ab6dbdf3c 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java 
> PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
>  PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java 
> PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
>  PRE-CREATION 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
>  e0224c6bcf4aeaa336e5786ac472482507fcd382 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java 
> a566db598c284d69ea61af88fdc0851483d5a089 
>   
> samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
>  527482d2ee55747e7b3f9c54c8a3b1afe7ad8797 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java 
> PRE-CREATION 
>   samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java 
> PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
>  7db437b348ecd286185898b8f8ab0220d59da71a 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
>  PRE-CREATION 
>   
> samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/52168/diff/
> 
> 
> Testing
> ---
> 
> Manual and unit testing has been done to verify the apis.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: [DISCUSS] SEP-1: Semantics of ProcessorId in Samza

2017-03-23 Thread Prateek Maheshwari
Hi Navina,

Thanks for SEP-1, looks pretty good to me. A few questions/comments:

Implementation/Interface related:
1. Do you have any examples of custom processor IDs? Wondering what
information/classes ProcessorIdGenerator would need to be able to generate
one.
2. The default "static" getProcessorIdGeneratorFromConfig should be on the
ProcessorIdGenerator interface, not the JobCoordinator. Also, prefer
removing the fromConfig suffix from the method name and calling it create
instead of get? Not sure what the convention here is. @Xinyu, any
preferences?
3. +1 for removing the constructor parameter, but I think the
JobCoordinator should still only have a #getProcessorId method instead of
#getProcessorIdGenerator. Theoretically, a processor/container doesn't need
to generate multiple IDs, it only needs to know its own. @Jagadish, prefer
the more restrictive API unless you have a use case for the more general
one in mind.

Documentation/SEP related:
1. The SEP uses both ProcessorIdentifier and ProcessorIdGenerator as
synonyms. Let's update to use ProcessorIdGenerator consistently.
2. Minor: 'processor.id' configuration: I'm assuming this still needs to be
unique for each processor in the job? If so, probably worth calling out in
the SEP or configuration docs. We can also document it as deprecated and a
candidate for removal in near future (maybe 0.14?).

Thanks,
Prateek


On Tue, Mar 21, 2017 at 2:34 PM, Navina Ramesh (Apache) 
wrote:

> Hi everyone,
> I have updated the SEP
>  1%3A+Semantics+of+ProcessorId+in+Samza>
> based
> on all the feedback. Feel free to comment.
>
> I will start the [vote] mail thread, if there are no further questions
> within the next 24 hours.
>
> Thanks!
> Navina
>
> On Tue, Mar 21, 2017 at 10:33 AM, Navina Ramesh (Apache) <
> nav...@apache.org>
> wrote:
>
> > Hi Jagadish,
> > Thanks for the suggestion. You are right in that it should be the
> > responsibility of the JobCoordinator to assign identifiers.
> >
> > > 'm only wondering if this logic could instead reside inside the
> > Job Coordinator (which is internal to the StreamProcessor) instead of
> > relying on something external to it?
> >
> > I think this is a consequence of our initial StandaloneJobCoordinator,
> > which is pretty much a pass-through. I didn't see any usage for
> > getProcessorId() and was wondering why we put it in the JobCoordinator
> > interface. I think I should keep your design proposal from last year
> handy
> > :) Thanks for pitching in!
> >
> >
> > @All:
> > Yesterday, there was a discussion on naming of the configuration used in
> > this SEP - whether it should be within the "job" scope or "app" scope
> > (introduced by SAMZA-1041
> > ).  Multi-stage
> feature
> > and fluent-api for Samza introduces the notion of "application". Since
> the
> > processorId generator config applies to all jobs within a Samza
> > application, we decided to add the config for generator under "app"
> scope.
> > Further details on config scope changes can be found in SAMZA-1120.
> > 
> >
> > I will send out an update once I change the SEP based on yesterday's
> > meeting and Jagadish's idea.
> >
> > Thanks!
> > Navina
> >
> > On Mon, Mar 20, 2017 at 5:22 PM, Jagadish Venkatraman <
> > jagadish1...@gmail.com> wrote:
> >
> >> Thanks for writing this SEP!
> >>
> >> Here's an alternate approach instead of taking the "String processorId"
> as
> >> a parameter in the constructor. In my view, the "processorId" could be
> >> generated by the StreamProcessor internally (instead of being generated
> >> up-stream and passed in). The Job Coordinator API could be as follows:
> >>
> >>
> >> public interface JobCoordinator {
> >>
> >>  ProcessorIdGenerator getProcessorIDGenerator();
> >>
> >> // could be String getProcessorID()
> >>
> >>  JobModel getJobModel();
> >>
> >> }
> >>
> >> public interface ProcessorIDGenerator {
> >>
> >>  String getProcessorID();
> >> }
> >>
> >>
> >> For instance, an Yarn job coordinator can merely parse the ID from
> config,
> >> and return it. A Zk backed implementation of the Job coordinator can
> agree
> >> on IDs using coordination leveraging Zk. One nice property with this
> >> approach is that it keeps all logic related to coordination, agreement
> on
> >> the Job Model, leader election (with potentially pluggable components
> for
> >> each) inside the JobCoordinator.
> >>
> >> To be clear, I'm all for pluggability for ID generation logic that this
> >> SEP
> >> advocates. I'm only wondering if this logic could instead reside inside
> >> the
> >> Job Coordinator (which is internal to the StreamProcessor) instead of
> >> relying on something external to it?
> >>
> >> Of course, there may be other considerations around the way the current
> >> code is structured that may prevent this. Let me know if you agree with
> >> this change.
> >>
> >> Thanks

Re: [DISCUSS] SEP-1: Semantics of ProcessorId in Samza

2017-03-24 Thread Prateek Maheshwari
Hi Navina,

1. Assuming the environment can put the processor ID in the config,
ProcessorIdGenerator#generateProcessorId(Config config) makes sense.
Passing all of Config is rather broad, but I don't think we have an
environment specific subset class for config yet, so should be OK.

2. I don't yet see the value of putting the class-loading helper default
methods in multiple public interfaces. Seems like they're (usually) going
to be used by the framework, are pretty simple to write, and could probably
be written as a common Util method if we find them repetitive. Maybe skip
this method for now and add this once we have some clarity on this new
pattern?

If we keep it, let's name it "ProcessorIdGenerator#fromConfig(Config
config)" to be consistent with "ApplicationRunner#fromConfig(Config
config)"?

3. I think we're in agreement that the Processor doesn't need to have
access to the ProcessorIdGenerator, only the JobCoordinator does. With
JobCoordinator only exposing #getProcessorId I think we're good.

+1 from me with these minor changes. Thanks for the proposal!

Best,
Prateek

On Thu, Mar 23, 2017 at 11:35 PM, Navina R  wrote:

> Hi Prateek,
> > 1. Do you have any examples of custom processor IDs? Wondering what
> information/classes ProcessorIdGenerator would need to be able to generate
> one.
> Yeah. When I was trying to implement the proposal, I was wondering the
> same thing as well. However, it might end up being specific to the
> environment. In case of Yarn, I would expect the AppMaster to still specify
> the processorID as an environment variable. In case of Rain (which is a
> Linkedin specific deployment framework),it will probably be a combination
> of sliceId and instanceId. Given these variations, I am not sure what can
> be generic enough to encompass all these information. Maybe we can pass the
> config to the instance factory. But, let me know if you have other ideas.
>
>
>  > 2. The default "static" getProcessorIdGeneratorFromConfig should be on
> the ProcessorIdGenerator interface, not the JobCoordinator. Also, prefer
> removing the fromConfig suffix from the method name and calling it create
> instead of get? Not sure what the convention here is.
> You are right. It should be in ProcessorIdGenerator. I can remove the
> fromConfig suffix. Should I just call it createInstance, similar to the
> Java apis ? I am not sure what the convention is either.
>
> > I think the JobCoordinator should still only have a #getProcessorId
> method instead of #getProcessorIdGenerator.
> JobCoordinator still has getProcessorId. If I move the
> getProcessorIdGenerator helper to ProcessorIdGenerator, it makes sense?
>
> > Theoretically, a processor/container doesn't need to generate multiple
> IDs, it only needs to know its own
> I believe the reasoning originated from SAMZA-881, where we identified the
> requirements for running Samza in distributed mode and the responsibilities
> of each component in a homogenous stream processing mode. Theoretically
> speaking, it is the job coordinator that will assign identifiers to its
> processor. Practically, since it is bound to the runtime environment, it
> seems appropriate for the job coordinator to generate the id. If you
> haven't read SAMZA-881, you should give it a read. If we go by the
> assumption that a leader processor (in simpler terms, a central authority)
> generates the JobModel, it needs to "know" the identifiers of all
> processors.
> An alternative model is be the one where the leader spawns the processors
> and also, "assigns" identifiers for all processors (for example, in Yarn
> today). The latter model is restrictive in that:
> 1. it expects the leader to know the number of processors required in the
> job
> 2. leader processor is different from other processors, thus, making a
> Samza job a more heterogenous set of processors
>
> Hope these points make sense. Jagadish can add more, and even correct me
> if I got anything wrong here :)
>
> > 1. The SEP uses both ProcessorIdentifier and ProcessorIdGenerator as
> synonyms. Let's update to use ProcessorIdGenerator consistently.
> Will do
>
> > 2. Minor: 'processor.id' configuration: I'm assuming this still needs
> to be unique for each processor in the job? If so, probably worth calling
> out in the SEP or configuration docs. We can also document it as deprecated
> and a candidate for removal in near future (maybe 0.14?).
> Yes. That is still a requirement. I think I updated the document regarding
> deprecating and removing it. I will re-check.
>
> Thanks for your comments.
>
> Cheers!
> Navina
>
> On Thu, Mar 23, 2017 at 12:04 PM, Prateek Maheshwari <
> pmahes

Re: [DISCUSS] Support Scala 2.12

2017-03-29 Thread Prateek Maheshwari
Hi Maksim,

I'm in favor of adding Scala 2.12 support as well, thanks for the PR.
I have a few questions about the way JavaConverter APIs and some of the
conversions in the PR work. I'll try it out locally and update the PR with
feedback/questions soon.

Thanks,
Prateek


On Tue, Mar 28, 2017 at 3:01 PM, Maksim Logvinenko 
wrote:

> Hi guys,
>
> As far as I can understand nobody is against having Scala 2.12 support in
> Samza master. Can we merge PR then?
>
> Best regards,
> Maxim Logvinenko
>
> On 17 March 2017 at 23:42:16, Navina Ramesh (nram...@linkedin.com.invalid)
> wrote:
>
> Thanks for creating the DISCUSS email!
>
> This is good. It's a good idea to update to 2.12 since it looks like we are
> fully backward compatible with older versions. +1 from me.
>
> Cheers!
> Navina
>
> On Fri, Mar 17, 2017 at 1:34 PM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> > Thanks for starting this discussion and the patch. +1 for supporting
> scala
> > 2.12. I assume the changes are fully backwards compatible with scala
> 2.10,
> > 2.11 (as evidenced by your check-all)?
> >
> > Also, another observation is that the generated Samza binaries will have
> > 2.12 as the suffix for the future release (I this should be totally OK).
> >
> >
> > On Fri, Mar 17, 2017 at 1:26 PM, Maksim Logvinenko <
> mlogvine...@gmail.com>
>
> > wrote:
> >
> > > Hi guys,
> > >
> > > I’ve created JIRA and already submitted patch which adds support of
> scala
> > > 2.12. Here is the ticket: https://issues.apache.org/
> > jira/browse/SAMZA-1135
> > > .
> > > Nothing serious: I’ve removed JavaConversions usage (because it’s
> marked
> > as
> > > deprecated now) and bumped kafka and scalatest versions since previous
> > > versions don’t have scala 2.12 support. I run ./bin/check-all.sh on my
> > > laptop and it was successful for all scala versions (2.10, 2.11 and
> 2.12)
> > > and for both YARN versions.
> > >
> > > Thanks,
> > > Maxim Logvinenko
> > >
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>
>
>
> --
> Navina R.
>


Re: [VOTE] SEP-1: Semantics of ProcessorId in Samza

2017-03-29 Thread Prateek Maheshwari
+1 (non binding) from me.

- Prateek

On Tue, Mar 28, 2017 at 2:17 PM, Boris S  wrote:

> +1 Looks good to me.
>
> On Tue, Mar 28, 2017 at 2:00 PM, xinyu liu  wrote:
>
> > +1 on my side. Very happy to see this proposal. This is a blocker for
> > integrating fluent API with StreamProcessor, and hopefully we can get it
> > resolved soon :).
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, Mar 28, 2017 at 11:28 AM, Navina Ramesh (Apache) <
> > nav...@apache.org>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > This is a voting thread for SEP-1: Semantics of ProcessorId in Samza.
> > > For reference, here is the wiki link:
> > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > 1%3A+Semantics+of+ProcessorId+in+Samza
> > >
> > > Link to discussion mail thread:
> > > http://mail-archives.apache.org/mod_mbox/samza-dev/201703.
> > > mbox/%3CCANazzuuHiO%3DvZQyFbTiYU-0Sfh3riK%3Dz4j_
> > AdCicQ8rBO%3DXuYQ%40mail.
> > > gmail.com%3E
> > >
> > > Please vote on this SEP asap. :)
> > >
> > > Thanks!
> > > Navina
> > >
> >
>


Re: [VOTE] SEP-1: Semantics of ProcessorId in Samza

2017-03-30 Thread Prateek Maheshwari
Yi, why add 'local' to the method name? Isn't the method called only by the
StreamProcessor to get its own ID? Seems like both 1 & 2 belong in the
method documentation.

- Prateek

On Thu, Mar 30, 2017 at 1:43 PM, Yi Pan  wrote:

> Talked w/ Navina offline and agreed upon:
> 1) JobCoordinator.getLocalProcessorId() to be clear that we are getting
> the
> local processorId
> 2) Document the use case that there might be multiple StreamProcessors in
> the same JVM and ProcessorIdGenerator should implement a counter in this
> case.
>
> So, +1 (binding)
>
> On Thu, Mar 30, 2017 at 1:23 PM, Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com> wrote:
>
> > Hi Navina,
> >
> > Thanks for the great proposal! Having the big proposals documented on
> SEPs
> > is really great to have a good understanding on the system!
> > I have only a clarification question, the proposal states that every
> > containerId is the same as the processorId. So this means that inside a
> > container there will be a single processor? is this related to SAMZA-1080
> > somehow?
> >
> >
> > Best,
> >
> > Renato M.
> >
> > 2017-03-30 20:45 GMT+02:00 Navina Ramesh :
> >
> > > Hi Yi,
> > > Good question. Three reasons:
> > >
> > > 1. In SAMZA-881, we came up with a set of responsibilities for the
> > > JobCoordinator. One of them was to generate/assign processorId. So, it
> > > makes sense to keep getProcessorId() within JobCoordinator interface.
> > > 2. StreamProcessor was initially introduced as a user-facing API
> > > SAMZA-1080. ProcessorId was an argument in StreamProcessor constructor.
> > It
> > > was pushing the burden of guaranteeing unique among the processors of a
> > job
> > > to the user. This was not favorable.
> > > 3. In general, I think we have consensus that the processorIdGenerator
> is
> > > going to specific to a runtime environment. Hence, it seems more
> > > appropriate to move it to a lower abstraction layer that deals with the
> > > underlying execution environment.
> > >
> > > Let me know if you have a different perspective on this.
> > >
> > > Cheers!
> > > Navina
> > >
> > > On Thu, Mar 30, 2017 at 9:42 AM, Yi Pan  wrote:
> > >
> > > > @Navina,
> > > >
> > > > Sorry to chime in late. One question:
> > > > 1. Why is it in JobCoordinator, and why not in StreamProcessor class?
> > > > Because JobCoordinator provides coordination service across many
> > > > processors, an interface getProcessorId() in JobCoordinator is
> > confusing
> > > > regarding to which processorId we are getting.
> > > >
> > > > Otherwise, the proposal looks good.
> > > >
> > > > -Yi
> > > >
> > > > On Wed, Mar 29, 2017 at 7:57 PM, Navina Ramesh
> > > >  > > > > wrote:
> > > >
> > > > > Good to hear from you, Yan. Thanks! :)
> > > > >
> > > > > On Wed, Mar 29, 2017 at 7:48 PM, Yan Fang 
> > > wrote:
> > > > >
> > > > > > +1 . Thanks for the proposal, Navina. :)
> > > > > >
> > > > > > Fang, Yan
> > > > > > yanfang...@gmail.com
> > > > > >
> > > > > > On Thu, Mar 30, 2017 at 4:24 AM, Prateek Maheshwari <
> > > > > > pmaheshw...@linkedin.com.invalid> wrote:
> > > > > >
> > > > > > > +1 (non binding) from me.
> > > > > > >
> > > > > > > - Prateek
> > > > > > >
> > > > > > > On Tue, Mar 28, 2017 at 2:17 PM, Boris S 
> > wrote:
> > > > > > >
> > > > > > > > +1 Looks good to me.
> > > > > > > >
> > > > > > > > On Tue, Mar 28, 2017 at 2:00 PM, xinyu liu <
> > > xinyuliu...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 on my side. Very happy to see this proposal. This is a
> > > blocker
> > > > > for
> > > > > > > > > integrating fluent API with StreamProcessor, and hopefully
> we
> > > can
> > > > > get
> > > > > > > it
> > > > > > > > > resolved soon :).
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Xinyu
> > > > > > > > >
> > > > > > > > > On Tue, Mar 28, 2017 at 11:28 AM, Navina Ramesh (Apache) <
> > > > > > > > > nav...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > This is a voting thread for SEP-1: Semantics of
> ProcessorId
> > > in
> > > > > > Samza.
> > > > > > > > > > For reference, here is the wiki link:
> > > > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > > > > > 1%3A+Semantics+of+ProcessorId+in+Samza
> > > > > > > > > >
> > > > > > > > > > Link to discussion mail thread:
> > > > > > > > > > http://mail-archives.apache.or
> g/mod_mbox/samza-dev/201703.
> > > > > > > > > > mbox/%3CCANazzuuHiO%3DvZQyFbTiYU-0Sfh3riK%3Dz4j_
> > > > > > > > > AdCicQ8rBO%3DXuYQ%40mail.
> > > > > > > > > > gmail.com%3E
> > > > > > > > > >
> > > > > > > > > > Please vote on this SEP asap. :)
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > Navina
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Navina R.
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Navina R.
> > >
> >
>


Re: [DISCUSS] Support Scala 2.12

2017-03-30 Thread Prateek Maheshwari
Hi Maksim,

Thanks for the PR comments and updates. Looks good to me too.

- Prateek

On Wed, Mar 29, 2017 at 12:17 PM, Prateek Maheshwari <
pmaheshw...@linkedin.com> wrote:

> Hi Maksim,
>
> I'm in favor of adding Scala 2.12 support as well, thanks for the PR.
> I have a few questions about the way JavaConverter APIs and some of the
> conversions in the PR work. I'll try it out locally and update the PR with
> feedback/questions soon.
>
> Thanks,
> Prateek
>
>
> On Tue, Mar 28, 2017 at 3:01 PM, Maksim Logvinenko 
> wrote:
>
>> Hi guys,
>>
>> As far as I can understand nobody is against having Scala 2.12 support in
>> Samza master. Can we merge PR then?
>>
>> Best regards,
>> Maxim Logvinenko
>>
>> On 17 March 2017 at 23:42:16, Navina Ramesh (nram...@linkedin.com.invalid
>> )
>> wrote:
>>
>> Thanks for creating the DISCUSS email!
>>
>> This is good. It's a good idea to update to 2.12 since it looks like we
>> are
>> fully backward compatible with older versions. +1 from me.
>>
>> Cheers!
>> Navina
>>
>> On Fri, Mar 17, 2017 at 1:34 PM, Jagadish Venkatraman <
>> jagadish1...@gmail.com> wrote:
>>
>> > Thanks for starting this discussion and the patch. +1 for supporting
>> scala
>> > 2.12. I assume the changes are fully backwards compatible with scala
>> 2.10,
>> > 2.11 (as evidenced by your check-all)?
>> >
>> > Also, another observation is that the generated Samza binaries will have
>> > 2.12 as the suffix for the future release (I this should be totally OK).
>> >
>> >
>> > On Fri, Mar 17, 2017 at 1:26 PM, Maksim Logvinenko <
>> mlogvine...@gmail.com>
>>
>> > wrote:
>> >
>> > > Hi guys,
>> > >
>> > > I’ve created JIRA and already submitted patch which adds support of
>> scala
>> > > 2.12. Here is the ticket: https://issues.apache.org/
>> > jira/browse/SAMZA-1135
>> > > .
>> > > Nothing serious: I’ve removed JavaConversions usage (because it’s
>> marked
>> > as
>> > > deprecated now) and bumped kafka and scalatest versions since previous
>> > > versions don’t have scala 2.12 support. I run ./bin/check-all.sh on my
>> > > laptop and it was successful for all scala versions (2.10, 2.11 and
>> 2.12)
>> > > and for both YARN versions.
>> > >
>> > > Thanks,
>> > > Maxim Logvinenko
>> > >
>> >
>> >
>> >
>> > --
>> > Jagadish V,
>> > Graduate Student,
>> > Department of Computer Science,
>> > Stanford University
>> >
>>
>>
>>
>> --
>> Navina R.
>>
>
>


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-23 Thread Prateek Maheshwari
Thanks for putting this together Yi!

I agree with Jake, it does seem like there are a few too many moving parts
here. That said, the problem being solved is pretty broad, so let me try to
summarize my current understanding of the requirements. Please correct me
if I'm wrong or missing something.

ApplicationRunner and JobRunner first, ignoring test environment for the
moment.
ApplicationRunner:
1. Create execution plan: Same in Standalone and Yarn
2. Create intermediate streams: Same logic but different leader election
(ZK-based or pre-configured in standalone, AM in Yarn).
3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.

JobRunner:
1. Run the StreamProcessors: Same process in Standalone & Test. Remote host
in Yarn.

To get a single ApplicationRunner implementation, like Jake suggested, we
need to make leader election and JobRunner implementation pluggable.
There's still the question of whether ApplicationRunner#run API should be
blocking or non-blocking. It has to be non-blocking in YARN. We want it to
be blocking in standalone, but seems like the main reason is ease of use
when launched from main(). I'd prefer making it consitently non-blocking
instead, esp. since in embedded standalone mode (where the processor is
running in another container) a blocking API would not be user-friendly
either. If not, we can add both run and runBlocking.

Coming to RuntimeEnvironment, which is the least clear to me so far:
1. I don't think RuntimeEnvironment should be responsible for providing
StreamSpecs for streamIds - they can be obtained with a config/util class.
The StreamProcessor should only know about logical streamIds and the
streamId <-> actual stream mapping should happen within the
SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
2. There's also other components that the user might be interested in
providing implementations of in embedded Standalone mode (i.e., not just in
tests) - MetricsRegistry and JMXServer come to mind.
3. Most importantly, it's not clear to me who creates and manages the
RuntimeEnvironment. It seems like it should be the ApplicationRunner or the
user because of (2) above and because StreamManager also needs access to
SystemAdmins for creating intermediate streams which users might want to
mock. But it also needs to be passed down to the StreamProcessor - how
would this work on Yarn?

I think we should figure out how to integrate RuntimeEnvironment with
ApplicationRunner before we can make a call on one vs. multiple
ApplicationRunner implementations. If we do keep LocalApplicationRunner and
RemoteApplication (and TestApplicationRunner) separate, agree with Jake
that we should remove the JobRunners and roll them up into the respective
ApplicationRunners.

- Prateek

On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes  wrote:

> Thanks for the SEP!
>
> +1 on introducing these new components
> -1 on the current definition of their roles (see Design feedback below)
>
> *Design*
>
>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>launching a Job, what additional value do the different types of
>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
> flag
>that all 3 would need to change from environment to environment. It
>indicates that they don't have proper modularity. The
> call-sequence-figures
>support this; LocalApplicationRunner and RemoteApplicationRunner make
> the
>same calls and the diagram only varies after jobRunner.start()
>- As far as I can tell, the only difference between Local and Remote
>ApplicationRunner is that one is blocking and the other is
> non-blocking. If
>that's all they're for then either the names should be changed to
> reflect
>this, or they should be combined into one ApplicationRunner and just
> expose
>separate methods for run() and runBlocking()
>- There isn't much detail on why the main() methods for Local/Remote
>have such different implementations, how they receive the Application
>(direct vs config), and concretely how the deployment scripts, if any,
>should interact with them.
>
>
> *Style*
>
>- nit: None of the 11 uses of the word "actual" in the doc are
> *actually*
>needed. :-)
>- nit: Colors of the runtime blocks in the diagrams are unconventional
>and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-)
>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>term "execution environment" is used
>- The code comparisons for the ApplicationRunners are not apples-apples.
>The local runner example is an application that USES the local runner.
> The
>remote runner example is the just the runner code itself. So, it's not
>readily apparent that we're comparing the main() methods and not the
>application itself.
>
>
> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:
>
> > Made some updates to clarify the role and functions of RuntimeEnvironment
> >

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread Prateek Maheshwari
support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt 
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >
> >> > For the moment I've worked around the two pass requirement (once for
> >> > config, once for StreamGraph) by introducing an IR layer between Beam
> >> and
> >> > the Samza Fluent translation. The IR layer is convenient independent
> of
> >> > this problem because it makes it easier to switch between the Fluent
> and
> >> > low-level APIs.
> >> >
> >> >
> >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> >> > great shape. One additional issue with the status call that I may not
> >> have
> >> > mentioned is that it provides you no way to get at the cause of
> failure.
> >> > The StreamProcessor API does allow this via the callback.
> >> >
> >> >
> >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> >> > indirection you currently have to jump through (this is also related
> to
> >> > system consumer configuration from #1. It makes it much easier to
> >> discover
> >> > what the configurable parameters are too, if we provide some
> >> programmatic
> >> > way to tweak them in the API - which can turn into config under the
> >> hood.
> >> >
> >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu 
> >> wrote:
> >> >
> >> >> Let me give a shot to summarize the requirements for
> ApplicationRunner
> >> we
> >> >> have discussed so far:
> >> >>
> >> >> - Support environment for passing in user-defined objects (streams
> >> >> potentially) into ApplicationRunner (*Beam*)
> >> >>
> >> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> >> configurations
> >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >> >>
> >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We
> can
> >> >> have one or more implementations but it's hidden from the users.
> >> >>
> >> >> - Separate StreamGraph from environment so it can be serializable
> >> (*Beam,
> >> >> Yarn*)
> >> >>
> >> >> - Better life cycle management of application, including
> >> >> start/stop/stats (*Standalone,
> >> >> Beam*)
> >> >>
> >> >>
> >> >> One way to address 2 and 3 is to provide pre-packaged runner using
> >> static
> >> >> factory methods, and the return type will be the ApplicationRunner
> >> >> interface. So we can have:
> >> >>
> >> >>   ApplicationRunner runner = ApplicationRunner.zk() /
> >> >> ApplicationRunner.local()
> >> >> / ApplicationRunner.remote() / ApplicationRunner.test().
> >> >>
> >> >> Internally we will package the right configs and run-time environment
> >> with
> >> >> the runner. For example, ApplicationRunner.zk() will define all the
> >> >> configs
> >> >> needed for zk coordination.
> >> >>
> >> >> To support 1 and 4, can we pass in

[DISCUSS] Samza 0.13.0 release

2017-05-05 Thread Prateek Maheshwari
 Hi all,

There have been quite a lot of new features added to master since 0.12
release to warrant a new major release. At LinkedIn, we've done functional
and performance testing against master in the past weeks, and deployed jobs
with the latest build in production. We will continue to test for stability
in the next few weeks.

We've made significant progress on several exciting new features:
SAMZA-1063: Samza Standalone Project
SAMZA-1073: Top-level fluent API
SAMZA-1130: ApplicationRunner for running StreamApplication

Here are a few additional features that will also be included in this
release:
SAMZA-868: Support Elasticsearch version 2.x
SAMZA-1135: Support Scala 2.12
SAMZA-1140: Non blocking commit in Async Runloop
SAMZA-1143: Universal config support for localized resource
SAMZA-1145: Provide Ability To Confgure The Default Number Of Changelog
Replicas
SAMZA-1154: Tasks endpoint to list the complete details of all tasks
related to a job

An exhaustive list of changes in 0.13 can be found here

.

Here are the issues that we should consider as blockers for the 0.13.0
release:
SAMZA-871: Heart-beat mechanism between JobCoordinator and all running
containers
SAMZA-1150: Handling Error propagation between ZkJobCoordinator &
DebounceTimer
SAMZA-1153: Metrics should be added for ZK based JobCoordinator
SAMZA-1155: Allow configuring window.ms to specify trigger duration
SAMZA-1268: Final renamings and javadocs for public APIs for 0.13 release
SAMZA-1267: ApplicationRunner#getLocalRunner returns null

Here's what I propose:
1. Cut an 0.13.0 release branch.
2. Work on getting the blocker JIRAs resolved.
3. Target a release vote on the week of May 15th.

Please let us know if there are any other changes you'd like to go in
0.13.0.

Thanks,
Prateek


Re: [VOTE] Apache Samza 0.13.0 RC0

2017-05-17 Thread Prateek Maheshwari
Resent the CANCEL email, hopefully it makes it this time.

- Prateek

On Wed, May 17, 2017 at 2:08 PM, Navina Ramesh (Apache) 
wrote:

> Prateek told me that he sent out a cancel email. It didn't reach the
> mail-archive I think. Lately, we have this kind of issues where the emails
> are not reaching our dev list.
>
> On Wed, May 17, 2017 at 2:06 PM, Yi Pan  wrote:
>
> > Hi, all,
> >
> > Based on the conversation above, can we officially cancel this vote?
> >
> > Thanks!
> >
> > -Yi
> >
> > On Mon, May 15, 2017 at 9:31 AM, Ignacio Solis  wrote:
> >
> > > Thanks!
> > >
> > > On Mon, May 15, 2017 at 8:00 AM, Navina Ramesh
> > >  wrote:
> > > > I will try to get the patch out today. Work doesn't look trivial. I
> am
> > on
> > > > it.
> > > >
> > > > Navina
> > > >
> > > > On May 14, 2017 23:10, "Ignacio Solis"  wrote:
> > > >
> > > >> We should hold off until it is solved.  How long will it take to fix
> > > this?
> > > >>
> > > >> On Sun, May 14, 2017 at 10:13 PM, Navina Ramesh (Apache)
> > > >>  wrote:
> > > >> > I just changed the status of this JIRA to "BLOCKER" -
> > > >> > https://issues.apache.org/jira/browse/SAMZA-1128
> > > >> >
> > > >> > This causes a bug in standalone deployment where any failure in
> the
> > > >> barrier
> > > >> > protocol stops the scheduled executorservice. Unfortunately,
> > > >> > CoordinationUtils creates its own scheduled executorservice, which
> > is
> > > >> > incorrect. Scheduled ExecutorService is meant to be the working
> > queue
> > > for
> > > >> > the ZkJobCoordinator. This needs to be fixed. Bharath already ran
> > into
> > > >> this
> > > >> > bug during testing on Friday.
> > > >> >
> > > >> > veto for this release candidate.
> > > >> >
> > > >> > @Prateek/Jagadish:
> > > >> > I recommend sending a "non-vote, testing release candidate" for
> this
> > > >> > release until we complete all pending tasks (includes docs, tests
> > > etc).
> > > >> It
> > > >> > will also be useful to share the pending tasks and their progress.
> > In
> > > >> case
> > > >> > you have already shared it, I might have missed it since some
> emails
> > > are
> > > >> > bouncing off my inbox.
> > > >> >
> > > >> > Thanks!
> > > >> > Navina
> > > >> >
> > > >> > On Sun, May 14, 2017 at 1:30 PM, Boris S 
> wrote:
> > > >> >
> > > >> >> I think we need to add SAMZA-1286 and
> > > >> >> SAMZA-1279 to the release .
> > > >> >>
> > > >> >> On Wed, May 10, 2017 at 7:51 PM, Jagadish Venkatraman <
> > > >> jagad...@apache.org
> > > >> >> >
> > > >> >> wrote:
> > > >> >>
> > > >> >> > This is a call for a vote on a release of Apache Samza 0.13.0.
> > > Thanks
> > > >> to
> > > >> >> > everyone who has contributed to this release. We are very glad
> to
> > > see
> > > >> >> some
> > > >> >> > new contributors and features in this release.
> > > >> >> >
> > > >> >> > The release candidate can be downloaded from here:
> > > >> >> > http://home.apache.org/~jagadish/samza-0.13.0-rc0/
> > > >> >> >
> > > >> >> > The release candidate is signed with pgp key AF81FFBF, which
> can
> > be
> > > >> found
> > > >> >> > on keyservers:
> > > >> >> > http://pgp.mit.edu/pks/lookup?op=get&search=0xAF81FFBF
> > > >> >> >
> > > >> >> > The git tag is release-0.13.0-rc0 and signed with the same pgp
> > key:
> > > >> >> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=
> > > >> >> > refs/tags/release-0.13.0-rc0
> > > >> >> >
> > > >> >> > Test binaries have been published to Maven's staging
> repository,
> > > and
> > > >> are
> > > >> >> > available here:
> > > >> >> > https://repository.apache.org/content/repositories/
> > > >> orgapachesamza-1020
> > > >> >> >
> > > >> >> > 127 issues were resolved for this release:
> > > >> >> > https://issues.apache.org/jira/issues/?jql=project%20%
> > > >> >> > 3D%20SAMZA%20AND%20fixVersion%20in%20(0.13%2C%200.13.0)%
> > > >> >> > 20AND%20status%20in%20(Resolved%2C%20Closed)
> > > >> >> >
> > > >> >> > The vote will be open for 72 hours (ending at 8:00PM Saturday,
> > > >> >> 05/13/2017).
> > > >> >> >
> > > >> >> > Please download the release candidate, check the
> > hashes/signature,
> > > >> build
> > > >> >> it
> > > >> >> > and test it, and then please vote:
> > > >> >> >
> > > >> >> >
> > > >> >> > [ ] +1 approve
> > > >> >> >
> > > >> >> > [ ] +0 no opinion
> > > >> >> >
> > > >> >> > [ ] -1 disapprove (and reason why)
> > > >> >> >
> > > >> >> >
> > > >> >> > +1 from my side for the release.
> > > >> >> >
> > > >> >> > Cheers!
> > > >> >> >
> > > >> >>
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Nacho - Ignacio Solis - iso...@igso.net
> > > >>
> > >
> > >
> > >
> > > --
> > > Nacho - Ignacio Solis - iso...@igso.net
> > >
> >
>


Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-25 Thread Prateek Maheshwari
Hi Xinyu,

Thanks for the proposal. Some requests for clarifications. Let's update the
SEP directly instead of replying here.

E.g., in "For any following intermediate stream whose input streams are all
end-of-stream, it will be marked as pending EOS" - Should clarify that
(IIUC) something is injecting EOS messages in all intermediate stream
partitions once it receives EOS from all input stream partitions it's
consuming. Should also clarify what is that something.
Same for "declare end of stream once all the EOS messages have been
received." - What does this declaration involve and who is doing this?

In pro for approach 2: Not clear what this means - "The watermark can
conclude the input messages before this watermark have been complete."

For the cons of approach 2: "Complicated failure scenario of the second
job. It needs to checkpoint all the watermark messages received, so when it
recovered from failure, it can still count." - How is this related to EOS?
How is this related to the checkpoint watermark section?
Also, what is the "more messages required to write.. " referring to?

"Samza needs to reconcile based on the task counts." - Please explain what
reconciliation means, why it needs to happen, and why we need to track the
producer task and total task count in the watermark message to do this.

Checkpoint watermarks section is also unclear. What problem are we trying
to solve here?

Should also move the message format and the watermark message interface
sections to the bottom, since they depend on details in the event time and
checkpoint watermark sections.

Thanks,
Prateek


On Wed, May 24, 2017 at 11:30 AM, xinyu liu  wrote:

> Hi all,
>
> I created SEP-6 for SAMZA-1260
> : Support Watermark
> Across Intermediate Streams for Batch Processing. The link to the SEP is
> here:
>
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
>
> Please review and comments are welcome!
>
> Thanks,
> Xinyu
>


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Prateek Maheshwari
ogrammatically in env. Suppose we have the
> following
> >>>> > before
> >>>> > > > > runner.input(...):
> >>>> > > > >
> >>>> > > > >   runner.setup(env /* a writable interface of env*/ -> {
> >>>> > > > > env.setStreamSpec(streamId, streamSpec);
> >>>> > > > > env.setSystem(systemName, systemFactory);
> >>>> > > > >   })
> >>>> > > > >
> >>>> > > > > runner.setup(->) also provides setup for stores and other
> runtime
> >>>> > stuff
> >>>> > > > > needed for the execution. The setup should be able to
> serialized
> >>>> to
> >>>> > > > config.
> >>>> > > > > For #6, I haven't figured out a good way to inject
> user-defined
> >>>> > objects
> >>>> > > > > here yet.
> >>>> > > > >
> >>>> > > > > With this API, we should be able to also support #4. For
> remote
> >>>> > > > > runner.run(), the operator user classes/lamdas in the
> StreamGraph
> >>>> > need
> >>>> > > to
> >>>> > > > > be serialized. As today, the existing option is to serialize
> to a
> >>>> > > stream,
> >>>> > > > > either the coordinator stream or the pipeline control stream,
> >>>> which
> >>>> > > will
> >>>> > > > > have the size limit per message. Do you see RPC as an option?
> >>>> > > > >
> >>>> > > > > For this version of API, seems we don't need the
> StreamApplication
> >>>> > > > wrapper
> >>>> > > > > as well as exposing the StreamGraph. Do you think we are on
> the
> >>>> right
> >>>> > > > path?
> >>>> > > > >
> >>>> > > > > Thanks,
> >>>> > > > > Xinyu
> >>>> > > > >
> >>>> > > > >
> >>>> > > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> >>>> > > > > cpett...@linkedin.com.invalid> wrote:
> >>>> > > > >
> >>>> > > > >> That should have been:
> >>>> > > > >>
> >>>> > > > >> For #1, Beam doesn't have a hard requirement...
> >>>> > > > >>
> >>>> > > > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <
> >>>> > cpett...@linkedin.com
> >>>> > > >
> >>>> > > > >> wrote:
> >>>> > > > >>
> >>>> > > > >> > For #1, I doesn't have a hard requirement for any change
> from
> >>>> > > Samza. A
> >>>> > > > >> > very nice to have would be to allow the input systems to be
> >>>> set up
> >>>> > > at
> >>>> > > > >> the
> >>>> > > > >> > same time as the rest of the StreamGraph. An even nicer to
> have
> >>>> > > would
> >>>> > > > >> be to
> >>>> > > > >> > do away with the callback based approach and treat graph
> >>>> building
> >>>> > > as a
> >>>> > > > >> > library, a la Beam and Flink.
> >>>> > > > >> >
> >>>> > > > >> > For the moment I've worked around the two pass requirement
> >>>> (once
> >>>> > for
> >>>> > > > >> > config, once for StreamGraph) by introducing an IR layer
> >>>> between
> >>>> > > Beam
> >>>> > > > >> and
> >>>> > > > >> > the Samza Fluent translation. The IR layer is convenient
> >>>> > independent
> >>>> > > > of
> >>>> > > > >> > this problem because it makes it easier to switch between
> the
> >>>> > Fluent
> >>>> > > > and
> >>>> > > > >> > low-level APIs.
> >&g

Re: Kafka client.id collision

2017-07-20 Thread Prateek Maheshwari
+1 for adding system name to the client id.

- Prateek

On Thu, Jul 20, 2017 at 10:43 AM, Navina Ramesh (Apache) 
wrote:

> Hi David,
>
> I think this is expected to occur as a warning since we spin up all kafka
> clients with the same client-id, which is $job.name + $job.id.
>
> As Jagadish mentioned, it will be great if you can provide us the entire
> log so that we can take a look.
>
> As a side note for the samza contributors, I do believe the container spins
> up kafka clients for each kafka systems defined, even if it is not used.
> Iirc, we use `KafkaUtil.getClientId` for generating the client id. Perhaps
> it makes sense to append another identifier with the client id (such as
> system name or component name). That way, we won't lose the kafka-client
> related metrics and there will be no overlap between the client ids.
> Thoughts?
>
> Thanks!
> Navina
>
> On Thu, Jul 20, 2017 at 9:13 AM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> > Can you share the entire log file if that's okay? The warning should be a
> > red-herring IMHO.
> >
> > On Thu, Jul 20, 2017 at 7:50 AM Davide Simoncelli <
> netcelli@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the reply.
> > >
> > > It is a warning, but the application fails. Here is the logging:
> > >
> > >
> > > 017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka version :
> > 0.10.1.1
> > > 2017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka commitId :
> > > f10ef2720b03b247
> > > 2017-07-20 10:43:06.351 [main] AppInfoParser [WARN] Error registering
> > > AppInfo mbean
> > > javax.management.InstanceAlreadyExistsException:
> > > kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
> > > at com.sun.jmx.mbeanserver.Repository.addMBean(
> > Repository.java:437)
> > > at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> > > at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> > > at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(
> > DefaultMBeanServerInterceptor.java:900)
> > > at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(
> > DefaultMBeanServerInterceptor.java:324)
> > > at
> > > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> > JmxMBeanServer.java:522)
> > > at
> > > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
> > AppInfoParser.java:58)
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.(
> > KafkaProducer.java:331)
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.(
> > KafkaProducer.java:163)
> > > at
> > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> > apply(KafkaSystemFactory.scala:89)
> > > at
> > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> > apply(KafkaSystemFactory.scala:89)
> > > at
> > > org.apache.samza.system.kafka.KafkaSystemProducer.send(
> > KafkaSystemProducer.scala:144)
> > > at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamSystemProduce
> > r.send(CoordinatorStreamSystemProducer.java:113)
> > > at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
> > sendSetConfigMessage(CoordinatorStreamWriter.java:98)
> > > at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
> sendMessage(
> > CoordinatorStreamWriter.java:82)
> > > at
> > > org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(
> > SamzaYarnAppMasterService.scala:68)
> > > at
> > > org.apache.samza.job.yarn.YarnClusterResourceManager.start(
> > YarnClusterResourceManager.java:180)
> > > at
> > > org.apache.samza.clustermanager.ContainerProcessManager.start(
> > ContainerProcessManager.java:167)
> > > at
> > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
> > ClusterBasedJobCoordinator.java:154)
> > > at
> > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(
> > ClusterBasedJobCoordinator.java:222)
> > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamWriter [INFO] Stopping
> > the
> > > coordinator stream producer.
> > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamSystemProducer [INFO]
> > > Stopping coordinator stream producer.
> > > 2017-07-20 10:43:06.549 [main] KafkaProducer [INFO] Closing the Kafka
> > > producer with timeoutMillis = 9223372036854775807 ms.
> > >
> > >
> > > > On 20 Jul 2017, at 3:16 pm, Jagadish Venkatraman <
> > jagadish1...@gmail.com>
> > > wrote:
> > > >
> > > > Hi Davide,
> > > >
> > > > Is this logged as an error or as a warning?
> > > >
> > > > IIUC, this warning should not fail the job. It may not cause some
> Mbean
> > > > sensors / metrics emitted from Kafka to be correctly reported (since,
> > > those
> > > > are reported per-clientId).
> > > >
> > > > The job 

Re: Noise in Logs After Upgrade to 0.13

2017-08-21 Thread Prateek Maheshwari
Hi Jeremiah,

Can you check if your classpath has multiple SLF4J bindings and if it's
picking something other than log4j (e.g. logback) instead? If that's the
case, the other implementation might not have an explicit configuration and
default to logging everything to stdout.

You can verify by checking your container stdout/stderr. Does it have the
following warning: "Class path contains multiple SLF4J bindings."? If so,
the actual implementation being used should also be logged there.

See also: https://www.slf4j.org/codes.html#multiple_bindings

Thanks,
Prateek

On Mon, Aug 21, 2017 at 11:44 AM, Jeremiah Adams 
wrote:

> I am seeing tons of noise in container logs after upgrading to samza 0.13.
> Changes to my job's log4j properties aren't affecting this output. Also,
> logs are now pointing output to stdout instead of the samza-cotnainer-n.log
> file.
>
>
> 18:42:19.761 [SAMZA-JVM-METRICS] DEBUG org.apache.samza.metrics.JvmMetrics
> - updating jvm metrics?
>
>
> What log4j file do I need to edit to correct this?
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> Blog | Twitter HelixEducation> | Facebook |
> LinkedIn
>


Re: Welcome Xinyu as new Samza PMC!

2018-01-17 Thread Prateek Maheshwari
This is great news. Congrats Xinyu, and thanks for your contributions!

> On Jan 17, 2018, at 10:39 AM, Srinivasulu Punuru  wrote:
> 
> Congrats Xinyu, Very well deserved!
> 
> 
> From: Jagadish Venkatraman 
> Sent: Wednesday, January 17, 2018 10:37:46 AM
> To: dev@samza.apache.org
> Subject: Re: Welcome Xinyu as new Samza PMC!
> 
> Big Congrats Xinyu. Thanks for your continued contributions to all aspects
> of the project!
> 
> On Wed, Jan 17, 2018 at 10:36 AM, Wei Song  wrote:
> 
>> Congrats, Xinyu!
>> 
>> --
>> Thanks
>> -Wei
>> 
>> 
>> On 1/17/18, 10:35 AM, "Navina Ramesh"  wrote:
>> 
>>Congratulations, Xinyu!
>>Thanks for all your contribution and looking forward to more 😊
>> 
>> 
>>Cheers!
>>Navina
>> 
>>
>>From: Yi Pan 
>>Sent: Wednesday, January 17, 2018 10:26:54 AM
>>To: dev@samza.apache.org
>>Subject: Welcome Xinyu as new Samza PMC!
>> 
>>Finally all the documentation procedure is completed and Xinyu Liu has
>> been
>>officially promoted to Samza PMC member! This is well deserved due to
>> his
>>continued contribution to the Samza project.
>> 
>>Please join me to welcome Xinyu as our newest PMC member!
>> 
>>Cheers!
>> 
>>-Yi Pan
>> 
>> 
>> 
> 
> 
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University



Re: Old style "low level" Tasks with alternative deployment model(s)

2018-03-16 Thread Prateek Maheshwari
Hi Thunder,

Can you please take and attach a thread dump with this?

Thanks,
Prateek

On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges 
wrote:

> It appears it IS hung while serializing the JobModel... very strange! I
> added some debug statements around the calls:
>
>   LOG.debug("Getting object mapper to serialize job model");  // this
> IS printed
>   ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>   LOG.debug("Serializing job model"); // this IS printed
>   String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> ().writeValueAsString(jobModel);
>   LOG.info("jobModelAsString=" + jobModelStr); // this is NOT printed!
>
> Another thing I noticed is that "getObjectMapper" actually creates the
> object mapper twice!
>
> 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job model
> 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes and mixins
> 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes  and mixins
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Serializing job model
>
> Could this ObjectMapper be a singleton? I see there is a private static
> instance, but getObjectMapper creates a new one every time...
>
> Anyway, then it takes off to serialize the job model and never comes
> back...
>
> Hoping someone has some idea here... the implementation for this mostly
> comes from Jackson-mapper-asl, and I have the version that is linked in the
> 0.14.0 tag:
> |||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> ||||\--- org.codehaus.jackson:jackson-core-asl:1.9.13
>
> Thanks!
> Thunder
>
> -Original Message-
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Friday, March 16, 2018 15:29
> To: dev@samza.apache.org; Jagadish Venkatraman 
> Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan <
> nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> So, my investigation starts at StreamProcessor.java.  Line 294 in method
> onNewJobModel() logs an INFO message that it's starting a container. This
> message never appears.
>
> I see that ZkJobCoordinator calls onNewJobModel from its
> onNewJobModelConfirmed method which also logs an info message stating
> "version X of the job model got confirmed". I never see this message
> either, so I go up the chain some more.
>
> I DO see:
>
> 2018-03-16 21:43:58 logback 20498 [ZkClient-EventThread-13-10.0.127.114:2181]
> INFO  o.apache.samza.zk.ZkJobCoordinator - ZkJobCoordinator::onBecomeLeader
> - I became the leader!
> And
> 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator - 
> pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
> new Job Model. Version = 1
>
> Which led me to method onDoProcessorChange line 210. I see that line, but
> not the line below " Published new Job Model. Version =" so something in
> here is not completing:
>
> LOG.info("pid=" + processorId + "Generated new Job Model. Version = "
> + nextJMVersion);
>
> // Publish the new job model
> zkUtils.publishJobModel(nextJMVersion, jobModel);
>
> // Start the barrier for the job model update
> barrier.create(nextJMVersion, currentProcessorIds);
>
> // Notify all processors about the new JobModel by updating JobModel
> Version number
> zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
>
> LOG.info("pid=" + processorId + "Published new Job Model. Version = "
> + nextJMVersion);
>
> As I mentioned, after the line "Generated new Job Model. Version = 1" I
> just get repeated zk ping responses.. no more application logging.
>
> The very next thing that's run 

Re: Old style "low level" Tasks with alternative deployment model(s)

2018-03-20 Thread Prateek Maheshwari
Glad you were able to figure it out, that was very confusing. Thanks for
the fix too.

- Prateek

On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges 
wrote:

> And that last issue was mine. My setting override was not picked up and it
> was using GroupByContainerCount instead.
> -Thanks,
> Thunder
>
>
> -Original Message-
> From: Thunder Stumpges
> Sent: Monday, March 19, 2018 20:58
> To: dev@samza.apache.org
> Cc: Jagadish Venkatraman ; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan 
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Well I figured it out. My specific issue was due to a simple dependency
> problem where I had gotten an older version of the Jackson-mapper library.
> However the code was throwing NoSuchMethodError (an Error instead of
> Exception) and being silently dropped. I created a pull request to handle
> any Throwable in ScheduleAfterDebounceTime.
> https://github.com/apache/samza/pull/450
>
> I'm now running into an issue with the generation of the JobModel and the
> ProcessorId. The ZkJobCoordinator has a ProcessorId that is a Guid, but
> when GroupByContainerIds class (my TaskNameGrouper) creates the
> ContainerModels, it is using the ContainerId (a numeric value, 0,1,2,etc)
> as the ProcessorId (~ line 105). This results in the JobModel that is
> generated and published immediately causing the processor to quit with this
> message:
>
> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not contain
> pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor.
>
> I was assuming I should be using GroupByContainerIds as my
> TaskNameGrouper. I don't see any other promising implementations. Am I just
> missing something?
>
> Thanks,
> Thunder
>
> JobModel
> {
>   "config" : {
>   ...
>   },
>   "containers" : {
> "0" : {
>   "tasks" : {
> "Partition 0" : {
>   "task-name" : "Partition 0",
>   "system-stream-partitions" : [ {
> "system" : "kafka",
> "partition" : 0,
> "stream" : "test_topic1"
>   }, {
> "system" : "kafka",
> "partition" : 0,
> "stream" : "test_topic2"
>   } ],
>   "changelog-partition" : 0
> },
> "Partition 1" : {
>   "task-name" : "Partition 1",
>   "system-stream-partitions" : [ {
> "system" : "kafka",
> "partition" : 1,
> "stream" : "test_topic1"
>   }, {
> "system" : "kafka",
> "partition" : 1,
> "stream" : "test_topic2"
>   } ],
>   "changelog-partition" : 1
> }
>   },
>   "container-id" : 0,
>   "processor-id" : "0"
> }
>   },
>   "max-change-log-stream-partitions" : 2,
>   "all-container-locality" : {
> "0" : null
>   }
> }
>
> -Original Message-
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Friday, March 16, 2018 18:21
> To: dev@samza.apache.org
> Cc: Jagadish Venkatraman ; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan 
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Attached. I don't see any threads actually running this code which is odd.
>
> There's my main thread that's waiting for the whole thing to finish, the
> "debounce-thread-0" (which logged the other surrounding messages below) has
> this:
>
> "debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x7fa0fd719800
> nid=0x21 waiting on condition [0x7fa0d0d45000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006f166e350> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> Dela

Re: Urgent : Help with latency / backlog / topic lag

2018-06-08 Thread Prateek Maheshwari
Hi Thunder,

> What we believe may be happening is that most of the topics have no
backlog, but one topic has all the backlog (this is because one of the
topics accounts for ~60% of the total message rate).  Could there be
something inducing extra latency on processing the one topic with a backlog
just having a bunch of other topics with NO backlog?
This seems very similar to this issue:
https://issues.apache.org/jira/browse/SAMZA-1599
This was fixed in https://github.com/apache/samza/pull/436, and the fix
should be available in the 0.14.1 version.
Would it be possible to try upgrading to 0.14.1? It should be backwards
compatible with 0.14.0.

For something you can try without upgrading: try setting
"job.container.single.thread.mode" to true. From the configuration reference
:
"If set to true, samza will fallback to legacy single-threaded event loop.
Default is false, which enables the multithreading execution."

Let us know if this doesn't help.

Thanks,
Prateek

On Fri, Jun 8, 2018 at 1:35 PM, Thunder Stumpges 
wrote:

> We have a new samza job which we just put into production. This job
> processes many topics (~30) but the total rate is not that high (~1200/sec
> in aggregate). I am unable to get above ~700/sec and have a growing backlog.
>
> We are running samza 0.12 (I have an update to 0.14 that is not tested or
> pushed yet).  When we load tested with a single topic, we could easily do
> several thousand per second. The latency of a single message is about 0.5ms
> as recorded by our timer metric on our 'process' call.
>
> What we believe may be happening is that most of the topics have no
> backlog, but one topic has all the backlog (this is because one of the
> topics accounts for ~60% of the total message rate).  Could there be
> something inducing extra latency on processing the one topic with a backlog
> just having a bunch of other topics with NO backlog?
>
> Some things I have tried:
>
>
>   1.  Increasing thread pool (10->20->30), no change
>   2.  Going from 1 container to 2, no help (the two containers run at half
> the speed and total is the same)
>   3.  Increasing task.max.concurrency from 1 -> 2 -> 3  (this had some
> minor help going from 1 to 2, but not enough)
>   4.  Increasing fetch.threshold.bytes (currently at 100,000 and we have
> pretty small messages)
>
> Some observed metrics:
>
>
>   *   "Pending Messages" are > 0  (15+ on some partitions)
>   *   "Messages in flight" is almost always 0
>   *   Polls rate is ~50/sec
>   *   Message chooser "Choos Obj" is ~680-700/sec like our processing rate
>   *   Message chooser "choose null" is ~50/sec
>
> I'm somewhat at a loss because based on the actual processing latency we
> should easily be able to do 2000+ with just a small handful of threads.
>
> Thanks in advance, this is in production I really need a solution.
> Thunder
>
>


Re: Urgent : Help with latency / backlog / topic lag

2018-06-08 Thread Prateek Maheshwari
Just to clarify, when you say you tried single threaded mode, do you
mean that you set job.container.thread.pool.size = 1, or that you set
job.container.single.thread.mode = true?

On Fri, Jun 8, 2018 at 2:53 PM, Thunder Stumpges  wrote:
> Thanks for the quick reply. That sounds very much like what I'm seeing. I'm 
> merging in 0.14.1 to our branch now. I did try single threaded mode and 
> unfortunately that didn't seem to make a significant difference. Perhaps I do 
> need some multithreading? I'm seeing a task latency 0.2ms per message but 
> still only achieve ~700/sec
>
>
> -Original Message-
> From: Prateek Maheshwari [mailto:prateek...@gmail.com]
> Sent: Friday, June 8, 2018 13:54
> To: dev@samza.apache.org
> Subject: Re: Urgent : Help with latency / backlog / topic lag
>
> Hi Thunder,
>
>> What we believe may be happening is that most of the topics have no
> backlog, but one topic has all the backlog (this is because one of the topics 
> accounts for ~60% of the total message rate).  Could there be something 
> inducing extra latency on processing the one topic with a backlog just having 
> a bunch of other topics with NO backlog?
> This seems very similar to this issue:
> https://issues.apache.org/jira/browse/SAMZA-1599
> This was fixed in https://github.com/apache/samza/pull/436, and the fix 
> should be available in the 0.14.1 version.
> Would it be possible to try upgrading to 0.14.1? It should be backwards 
> compatible with 0.14.0.
>
> For something you can try without upgrading: try setting 
> "job.container.single.thread.mode" to true. From the configuration reference
> <https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html>:
> "If set to true, samza will fallback to legacy single-threaded event loop.
> Default is false, which enables the multithreading execution."
>
> Let us know if this doesn't help.
>
> Thanks,
> Prateek
>
> On Fri, Jun 8, 2018 at 1:35 PM, Thunder Stumpges 
> wrote:
>
>> We have a new samza job which we just put into production. This job
>> processes many topics (~30) but the total rate is not that high
>> (~1200/sec in aggregate). I am unable to get above ~700/sec and have a 
>> growing backlog.
>>
>> We are running samza 0.12 (I have an update to 0.14 that is not tested
>> or pushed yet).  When we load tested with a single topic, we could
>> easily do several thousand per second. The latency of a single message
>> is about 0.5ms as recorded by our timer metric on our 'process' call.
>>
>> What we believe may be happening is that most of the topics have no
>> backlog, but one topic has all the backlog (this is because one of the
>> topics accounts for ~60% of the total message rate).  Could there be
>> something inducing extra latency on processing the one topic with a
>> backlog just having a bunch of other topics with NO backlog?
>>
>> Some things I have tried:
>>
>>
>>   1.  Increasing thread pool (10->20->30), no change
>>   2.  Going from 1 container to 2, no help (the two containers run at
>> half the speed and total is the same)
>>   3.  Increasing task.max.concurrency from 1 -> 2 -> 3  (this had some
>> minor help going from 1 to 2, but not enough)
>>   4.  Increasing fetch.threshold.bytes (currently at 100,000 and we
>> have pretty small messages)
>>
>> Some observed metrics:
>>
>>
>>   *   "Pending Messages" are > 0  (15+ on some partitions)
>>   *   "Messages in flight" is almost always 0
>>   *   Polls rate is ~50/sec
>>   *   Message chooser "Choos Obj" is ~680-700/sec like our processing rate
>>   *   Message chooser "choose null" is ~50/sec
>>
>> I'm somewhat at a loss because based on the actual processing latency
>> we should easily be able to do 2000+ with just a small handful of threads.
>>
>> Thanks in advance, this is in production I really need a solution.
>> Thunder
>>
>>


Re: Urgent : Help with latency / backlog / topic lag

2018-06-08 Thread Prateek Maheshwari
Good to hear its working now. Please feel free to reach out if you run
into any issues.

When you upgrade the job to 0.14.1, please remember to change
single.thread.mode to false (or remove this configuration): The bug in
SAMZA-1599 only affects the AsyncRunLoop implementation. Setting
"single.thread.mode = true" reverts to the older and synchronous
RunLoop implementation. Since the bug is fixed for AsyncRunLoop in
0.14.1, you can continue using it again.

Thanks,
Prateek

On Fri, Jun 8, 2018 at 3:23 PM, Thunder Stumpges  wrote:
> I set job.container.single.thread.mode = true
>
> And actually I think we did catch up with that setting. I have since 
> completed also the merge of 0.14.1 and we are able to keep up with the input 
> now.
>
> Thanks again for the pointers and the fast response!
>
> -Original Message-
> From: Prateek Maheshwari [mailto:prateek...@gmail.com]
> Sent: Friday, June 8, 2018 15:00
> To: dev@samza.apache.org
> Subject: Re: Urgent : Help with latency / backlog / topic lag
>
> Just to clarify, when you say you tried single threaded mode, do you mean 
> that you set job.container.thread.pool.size = 1, or that you set 
> job.container.single.thread.mode = true?
>
> On Fri, Jun 8, 2018 at 2:53 PM, Thunder Stumpges  wrote:
>> Thanks for the quick reply. That sounds very much like what I'm
>> seeing. I'm merging in 0.14.1 to our branch now. I did try single
>> threaded mode and unfortunately that didn't seem to make a significant
>> difference. Perhaps I do need some multithreading? I'm seeing a task
>> latency 0.2ms per message but still only achieve ~700/sec
>>
>>
>> -Original Message-
>> From: Prateek Maheshwari [mailto:prateek...@gmail.com]
>> Sent: Friday, June 8, 2018 13:54
>> To: dev@samza.apache.org
>> Subject: Re: Urgent : Help with latency / backlog / topic lag
>>
>> Hi Thunder,
>>
>>> What we believe may be happening is that most of the topics have no
>> backlog, but one topic has all the backlog (this is because one of the 
>> topics accounts for ~60% of the total message rate).  Could there be 
>> something inducing extra latency on processing the one topic with a backlog 
>> just having a bunch of other topics with NO backlog?
>> This seems very similar to this issue:
>> https://issues.apache.org/jira/browse/SAMZA-1599
>> This was fixed in https://github.com/apache/samza/pull/436, and the fix 
>> should be available in the 0.14.1 version.
>> Would it be possible to try upgrading to 0.14.1? It should be backwards 
>> compatible with 0.14.0.
>>
>> For something you can try without upgrading: try setting
>> "job.container.single.thread.mode" to true. From the configuration
>> reference
>> <https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html>:
>> "If set to true, samza will fallback to legacy single-threaded event loop.
>> Default is false, which enables the multithreading execution."
>>
>> Let us know if this doesn't help.
>>
>> Thanks,
>> Prateek
>>
>> On Fri, Jun 8, 2018 at 1:35 PM, Thunder Stumpges 
>> wrote:
>>
>>> We have a new samza job which we just put into production. This job
>>> processes many topics (~30) but the total rate is not that high
>>> (~1200/sec in aggregate). I am unable to get above ~700/sec and have a 
>>> growing backlog.
>>>
>>> We are running samza 0.12 (I have an update to 0.14 that is not
>>> tested or pushed yet).  When we load tested with a single topic, we
>>> could easily do several thousand per second. The latency of a single
>>> message is about 0.5ms as recorded by our timer metric on our 'process' 
>>> call.
>>>
>>> What we believe may be happening is that most of the topics have no
>>> backlog, but one topic has all the backlog (this is because one of
>>> the topics accounts for ~60% of the total message rate).  Could there
>>> be something inducing extra latency on processing the one topic with
>>> a backlog just having a bunch of other topics with NO backlog?
>>>
>>> Some things I have tried:
>>>
>>>
>>>   1.  Increasing thread pool (10->20->30), no change
>>>   2.  Going from 1 container to 2, no help (the two containers run at
>>> half the speed and total is the same)
>>>   3.  Increasing task.max.concurrency from 1 -> 2 -> 3  (this had
>>> some minor help going from 1 to 2, but not enough)
>>>   4.  Increasing fetch.threshold.bytes (currently at 100,000 and we
>>> have pretty small messages)
>>>
>>> Some observed metrics:
>>>
>>>
>>>   *   "Pending Messages" are > 0  (15+ on some partitions)
>>>   *   "Messages in flight" is almost always 0
>>>   *   Polls rate is ~50/sec
>>>   *   Message chooser "Choos Obj" is ~680-700/sec like our processing rate
>>>   *   Message chooser "choose null" is ~50/sec
>>>
>>> I'm somewhat at a loss because based on the actual processing latency
>>> we should easily be able to do 2000+ with just a small handful of threads.
>>>
>>> Thanks in advance, this is in production I really need a solution.
>>> Thunder
>>>
>>>


Re: Samza 0.14.1 : OffsetOutOfRangeException even with auto.offset.reset=smallest

2018-07-09 Thread Prateek Maheshwari
Hi Thunder,

Can you provide debug level logs from KafkaSystemConsumer with the
stack trace for the exception? It'll help figure out why the
auto.offset.reset property isn't taking effect.

If this error is due to an older checkpoint for the stream, you can
try resetting the checkpoint using the following two configurations:
streams.stream-id.samza.reset.offset: If set to true, when a Samza
container starts up, it ignores any checkpointed offset for this
particular input stream. Its behavior is thus determined by the
samza.offset.default setting. Note that the reset takes effect every
time a container is started, which may be every time you restart your
job, or more frequently if a container fails and is restarted by the
framework.

streams.stream-id.samza.offset.default: If a container starts up
without a checkpoint, this property determines where in the input
stream we should start consuming. The value must be an OffsetType, one
of the following:
  upcoming: Start processing messages that are published after the job
starts. Any messages published while the job was not running are not
processed.
  oldest: Start processing at the oldest available message in the
system, and reprocess the entire available message history.

I.e., set 'samza.reset.offset' = true, and 'samza.offset.default' =
oldest for your stream. Let us know if this doesn't help.

Thanks,
Prateek

On Fri, Jul 6, 2018 at 11:43 AM, Thunder Stumpges  wrote:
> Hi all,
>
>
> We've just run into a strange problem with samza 0.14.1. We had a job down 
> for a bit, while kafka cleaned past our saved offsets. When starting the job 
> now, we get repeated 
> org.apache.kafka.common.errors.OffsetOutOfRangeException. And it just retries 
> over and over again. We HAVE set
>
> systems.kafka.consumer.auto.offset.reset=smallest as well. Has anyone else 
> seen this? Our understanding from the documentation is that this setting says 
> what to do if the offset is out of range.
>
>
>
> systems.system-name.consumer.auto.offset.reset : This setting determines what 
> happens if a consumer attempts to read an offset that is outside of the 
> current valid range. This could happen if the topic does not exist, or if a 
> checkpoint is older than the maximum message history retained by the brokers.
>
>
>
> This is the set of messages that keeps repeating:
>
>
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Verifying 
> properties
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> client.id is overridden to samza_consumer-stg_apollo_crawler_stream_task-1
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> metadata.broker.list is overridden to kafka-server.ntent.com:9092
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> request.timeout.ms is overridden to 3
>
> 2018-07-06 18:32:15 INFO  kafka.client.ClientUtils$ - Fetching metadata from 
> broker BrokerEndPoint(0,kafka-server,9092) with correlation id 12 for 1 
> topic(s) Set(my-topic)
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Connected to 
> kafka-server:9092 for producing
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Disconnecting from 
> kafka-server:9092
>
> 2018-07-06 18:32:15 INFO  o.a.samza.system.kafka.GetOffset - Validating 
> offset 6883929 for topic and partition [my-topic,10]
>
> 2018-07-06 18:32:15 WARN  o.a.s.s.kafka.KafkaSystemConsumer - While 
> refreshing brokers for [my-topic,10]: 
> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested 
> offset is not within the range of offsets maintained by the server.. Retrying.
>
>
>
> Thanks!
>
> Thunder
>
>


[DISCUSS] SEP-14: System and Stream Descriptors

2018-08-06 Thread Prateek Maheshwari
Hi all,

Here's the proposal for System and Stream Descriptors - a way of
specifying systems, input and output streams properties in application
code instead of configurations.
https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors

Here's the PR with an implementation for the proposal above:
https://github.com/apache/samza/pull/603

Please let me know if you have any questions or feedback for the SEP above.

Thanks,
Prateek


Re: [DISCUSS] SEP-14: System and Stream Descriptors

2018-08-21 Thread Prateek Maheshwari
Hi folks,

I updated SEP-14 based on some feedback and discussions on the PR. Major 
changes are:

1. We will not support system level serdes _out of the box_ (e.g., for 
GenericSystemDescritors). Users must specify a stream level serde when creating 
Input/OutputStreamDescriptors. System implementers may still choose to support 
system level serdes for their own descriptors.

2. We will not support stream level input transformers in the initial 
implementation. We can add them later if we find this to be a common use case. 

3. We changed the SystemDescriptor base classes to optional interfaces for 
providing input and output descriptors. This allows system implementers more 
flexibility in providing input and output descriptors to users.

Please let me know if you have any questions or suggestions. If not, we can 
move SEP-14 from discussion to voting.

Thanks,
Prateek

> On Aug 6, 2018, at 4:20 PM, Prateek Maheshwari  wrote:
> 
> Hi all,
> 
> Here's the proposal for System and Stream Descriptors - a way of
> specifying systems, input and output streams properties in application
> code instead of configurations.
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors
> 
> Here's the PR with an implementation for the proposal above:
> https://github.com/apache/samza/pull/603
> 
> Please let me know if you have any questions or feedback for the SEP above.
> 
> Thanks,
> Prateek



Re: Need Some Help w/ Gradle Build on OpenJDK 11

2018-10-11 Thread Prateek Maheshwari
Hi Jeremiah,

We fixed a Rat related issue yesterday in
https://github.com/apache/samza/pull/703/. I don't know if this is the
same issue you were running into, but might be worth trying again with
the latest master.

Thanks,
Prateek
On Wed, Oct 10, 2018 at 6:58 AM Jeremiah Adams
 wrote:
>
> Anyone have a few cycles to help me out with Apache Rat failing the build on 
> OpenJDK 11?
>
> Some things I’ve tried:
>
> Bumped Gradle to 4.10.2 (required for Java11). Bumped the Rat version to 0.12.
>
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> Blog | Twitter | Facebook | LinkedIn
>
> 
> From: Jeremiah Adams 
> Sent: Tuesday, October 2, 2018 11:58 AM
> To: dev@samza.apache.org
> Subject: [POSSIBLE PHISHING] Need Some Help w/ Gradle Build on OpenJDK 11
>
> I know very little about Gradle.  I got a response on the issue I opened 
> regarding Gradle builds failing on OpenJDK 11. I've since upgraded the Gradle 
> version in gradle-wrapper.properties to 4.10.2. Build now gets past the 
> java11 issue but is dying on the Apache rat task.  The build/plugin can't 
> find a stylesheet.  I've had no luck chasing this down, can anyone point me 
> in the right direction?
>
> Output from build:
>
> jeremiah:samza jeremiah$ gradle --stacktrace clean build
> > Task :rat FAILED
>
> FAILURE: Build failed with an exception.
>
> * Where:
> Script '/Users/jeremiah/projects/open_source/samza/gradle/rat.gradle' line: 90
>
> * What went wrong:
> Execution failed for task ':rat'.
> > stylesheet 
> > /Users/jeremiah/.gradle/daemon/4.10.2/gradle/resources/rat-output-to-html.xsl
> >  doesn't exist.
>
> * Try:
> Run with --info or --debug option to get more log output. Run with --scan to 
> get full insights.
>
> * Exception is:
> org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':rat'.
> at 
> org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeActions(ExecuteActionsTaskExecuter.java:110)
> at 
> org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.execute(ExecuteActionsTaskExecuter.java:77)
> at 
> org.gradle.api.internal.tasks.execution.OutputDirectoryCreatingTaskExecuter.execute(OutputDirectoryCreatingTaskExecuter.java:51)
> at 
> org.gradle.api.internal.tasks.execution.SkipUpToDateTaskExecuter.execute(SkipUpToDateTaskExecuter.java:59)
> at 
> org.gradle.api.internal.tasks.execution.ResolveTaskOutputCachingStateExecuter.execute(ResolveTaskOutputCachingStateExecuter.java:54)
> at 
> org.gradle.api.internal.tasks.execution.ValidatingTaskExecuter.execute(ValidatingTaskExecuter.java:59)
> at 
> org.gradle.api.internal.tasks.execution.SkipEmptySourceFilesTaskExecuter.execute(SkipEmptySourceFilesTaskExecuter.java:101)
> at 
> org.gradle.api.internal.tasks.execution.FinalizeInputFilePropertiesTaskExecuter.execute(FinalizeInputFilePropertiesTaskExecuter.java:44)
> at 
> org.gradle.api.internal.tasks.execution.CleanupStaleOutputsExecuter.execute(CleanupStaleOutputsExecuter.java:91)
> at 
> org.gradle.api.internal.tasks.execution.ResolveTaskArtifactStateTaskExecuter.execute(ResolveTaskArtifactStateTaskExecuter.java:62)
> at 
> org.gradle.api.internal.tasks.execution.SkipTaskWithNoActionsExecuter.execute(SkipTaskWithNoActionsExecuter.java:59)
> at 
> org.gradle.api.internal.tasks.execution.SkipOnlyIfTaskExecuter.execute(SkipOnlyIfTaskExecuter.java:54)
> at 
> org.gradle.api.internal.tasks.execution.ExecuteAtMostOnceTaskExecuter.execute(ExecuteAtMostOnceTaskExecuter.java:43)
> at 
> org.gradle.api.internal.tasks.execution.CatchExceptionTaskExecuter.execute(CatchExceptionTaskExecuter.java:34)
> at 
> org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.run(EventFiringTaskExecuter.java:51)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:300)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:292)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:174)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:90)
> at 
> org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
> at 
> org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter.execute(EventFiringTaskExecuter.java:46)
> at 
> org.gradle.execution.taskgraph.LocalTaskInfoExecutor.execute(LocalTaskInfoExecutor.java:42)
> at 
> org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareWorkItemExecutor.execute(DefaultTaskExecutionGraph.java:277)
> at 
> org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$

[VOTE] SEP-14: System and Stream Descriptors

2018-10-12 Thread Prateek Maheshwari
Hi folks,

Now that SAMZA-1804 has been implemented and reviewed, we've updated
SEP-14 with the latest APIs and design decisions.

Please vote for accepting SEP-14 in its current form for the upcoming
Samza 1.0 release.
https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors

Thanks,
Prateek


Re: [VOTE] SEP-13: unified ApplicationDescriptor and ApplicationRunner APIs for high and low- level APIs in YARN and standalone deployment

2018-10-12 Thread Prateek Maheshwari
+1 (non-binding) from me. Thanks for making the changes and updating the SEP!

- Prateek

On Fri, Oct 12, 2018 at 12:15 PM Yi Pan  wrote:
>
> Hi, all,
>
> Given SAMZA-1789 has been reviewed and implemented, SEP-13 has been updated
> to the latest API classes as well. Please vote on whether there is further
> breaking changes needed in the API, or we can accept this proposal and seal
> it for 1.0.
>
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-13%3A+unify+high-+and+low-level+user+applications+in+YARN+and+standalone
>
> Thanks a lot!
>
> This email serves as my +1 (binding) to accept SEP-13.
>
> -Yi


Re: [VOTE] SEP-15: New Runtime Context API

2018-10-15 Thread Prateek Maheshwari
+1 (non-binding) for these changes.

- Prateek

> On Oct 12, 2018, at 3:27 PM, Cameron Lee  wrote:
> 
> Hi all,
> 
> SEP-15 has been updated now that SAMZA-1714 has been reviewed and implemented.
> Please vote on whether there are further breaking changes needed in the API 
> or we can accept this proposal to be included in Samza 1.0.
> 
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-15%3A+New+Runtime+Context+API
> 
> Thank you,
> Cameron



Re: [VOTE] SEP-15: New Runtime Context API

2018-10-15 Thread Prateek Maheshwari
+1 (non-binding) for these changes.

(Resending from a non-LI email due to email delivery issues)

- Prateek
On Fri, Oct 12, 2018 at 3:28 PM Cameron Lee  wrote:
>
> Hi all,
>
> SEP-15 has been updated now that SAMZA-1714 has been reviewed and implemented.
> Please vote on whether there are further breaking changes needed in the API 
> or we can accept this proposal to be included in Samza 1.0.
>
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-15%3A+New+Runtime+Context+API
>
> Thank you,
> Cameron


Re: [VOTE] SEP-14: System and Stream Descriptors

2018-10-17 Thread Prateek Maheshwari
Hi all,

This vote has been up for over 72 hours and has 3 binding votes, so
SEP-14 is now accepted. Thanks for reviewing and voting.

- Prateek
On Mon, Oct 15, 2018 at 10:42 AM Jagadish Venkatraman
 wrote:
>
> +1 binding from my side, thanks! This is a great addition to Samza-1.0
>
> On Fri, Oct 12, 2018 at 12:30 PM Prateek Maheshwari 
> wrote:
>
> > Hi folks,
> >
> > Now that SAMZA-1804 has been implemented and reviewed, we've updated
> > SEP-14 with the latest APIs and design decisions.
> >
> > Please vote for accepting SEP-14 in its current form for the upcoming
> > Samza 1.0 release.
> >
> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors
> >
> > Thanks,
> > Prateek
> >
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


Re: [VOTE] SEP-15: New Runtime Context API

2018-10-19 Thread Prateek Maheshwari
Hi all,

This VOTE has been up for > 72 hours and has 4 binding and 1
non-binding votes with no vetos. SEP-15 is now accepted. Thanks for
the contribution, Cameron!

- PrateekOn Tue, Oct 16, 2018 at 7:35 PM Jake Maes  wrote:
>
> +1 (binding)
>
> On Tue, Oct 16, 2018 at 3:00 PM Yi Pan  wrote:
>
> > +1 (binding)
> >
> > This has been long-waited feature to allow us to have better control and
> > access to shared object in different scope of context!
> >
> > -Yi
> >
> > On Mon, Oct 15, 2018 at 10:47 AM Jagadish Venkatraman <
> > jagadish1...@gmail.com> wrote:
> >
> > > +1 (binding) from my side.
> > >
> > > LGTM
> > >
> > > On Mon, Oct 15, 2018 at 10:44 AM Prateek Maheshwari  > >
> > > wrote:
> > >
> > > > +1 (non-binding) for these changes.
> > > >
> > > > (Resending from a non-LI email due to email delivery issues)
> > > >
> > > > - Prateek
> > > > On Fri, Oct 12, 2018 at 3:28 PM Cameron Lee 
> > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > SEP-15 has been updated now that SAMZA-1714 has been reviewed and
> > > > implemented.
> > > > > Please vote on whether there are further breaking changes needed in
> > the
> > > > API or we can accept this proposal to be included in Samza 1.0.
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-15%3A+New+Runtime+Context+API
> > > > >
> > > > > Thank you,
> > > > > Cameron
> > > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >


[VOTE] Apache Samza 1.0.0 RC0

2018-10-19 Thread Prateek Maheshwari
Hi all,

This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
everyone who has contributed to this release.

The release candidate can be downloaded from here:
http://home.apache.org/~pmaheshwari/samza-1.0.0-rc0/

The release candidate is signed with pgp key 6585B3D7, which can be found
on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7

The git tag is release-1.0.0-rc0 and signed with the same pgp key:
https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc0

Test binaries have been published to Maven's staging repository, and are
available here:
https://repository.apache.org/content/repositories/orgapachesamza-1051/

The vote will be open for 72 hours (ending at 7:00 PM PST Monday, 10/22/2018).

Please download the release candidate, check the hashes/signature, build it
and test it, and then please vote:

[ ] +1 approve

[ ] +0 no opinion

[ ] -1 disapprove (and reason why)

For me, I ran check-all.sh, integration tests and verified the SQL console
in samza-tool tgz. So +1 (non-binding) from my side.

Thanks,
Prateek


Re: [VOTE] Apache Samza 1.0.0 RC0

2018-10-22 Thread Prateek Maheshwari
Thanks for verifying Jagadish. I'll fix the test failure and create a
new RC and cancel the vote for this RC.

- Prateek
On Mon, Oct 22, 2018 at 4:23 PM Jagadish Venkatraman
 wrote:
>
> I ran the integration test and encountered this failure:
>
> 2018-10-22 15:58:43,687 zopkio.test_runner [INFO] test_samza_jobfailed
> 2018-10-22 15:58:43,688 zopkio.test_runner [INFO] ['AssertionError: Job
> (negate_number) appears not to have started. Expected to see a log line
> matching regex: .*Submitted application (\\w*)\n']
> 2018-10-22 15:58:43,688 zopkio.test_runner [INFO]
> test_container_performancefailed
> 2018-10-22 15:58:43,688 zopkio.test_runner [INFO] ['AssertionError: Job
> (container-performance) appears not to have started. Expected to see a log
> line matching regex: .*Submitted application (\\w*)\n']
> 2018-10-22 15:58:43,688 zopkio.test_runner [INFO]
> test_kafka_read_write_performancefailed
> 2018-10-22 15:58:43,688 zopkio.test_runner [INFO] ['AssertionError: Job
> (kafka-read-write-performance) appears not to have started. Expected to see
> a log line matching regex: .*Submitted application (\\w*)\n']
>
>
> -- Jagadish
>
>
>
> On Fri, Oct 19, 2018 at 6:59 PM Prateek Maheshwari 
> wrote:
>
> > Hi all,
> >
> > This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
> > everyone who has contributed to this release.
> >
> > The release candidate can be downloaded from here:
> > http://home.apache.org/~pmaheshwari/samza-1.0.0-rc0/
> >
> > The release candidate is signed with pgp key 6585B3D7, which can be found
> > on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7
> >
> > The git tag is release-1.0.0-rc0 and signed with the same pgp key:
> >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc0
> >
> > Test binaries have been published to Maven's staging repository, and are
> > available here:
> > https://repository.apache.org/content/repositories/orgapachesamza-1051/
> >
> > The vote will be open for 72 hours (ending at 7:00 PM PST Monday,
> > 10/22/2018).
> >
> > Please download the release candidate, check the hashes/signature, build it
> > and test it, and then please vote:
> >
> > [ ] +1 approve
> >
> > [ ] +0 no opinion
> >
> > [ ] -1 disapprove (and reason why)
> >
> > For me, I ran check-all.sh, integration tests and verified the SQL console
> > in samza-tool tgz. So +1 (non-binding) from my side.
> >
> > Thanks,
> > Prateek
> >
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


[CANCEL][VOTE] Apache Samza 1.0.0 RC0

2018-10-22 Thread Prateek Maheshwari
Hi all,

This is the CANCEL notification for the 1.0.0 RC0. We found an
integration test setup issue that we will fix. We will also include
the following PR in the new RC:
SAMZA-1901: Implementation of Samza SQL Shell,

Thanks,
Prateek


[VOTE] Apache Samza 1.0.0 RC1

2018-10-22 Thread Prateek Maheshwari
Hi all,

This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
everyone who has contributed to this release.

The release candidate can be downloaded from here:
http://home.apache.org/~pmaheshwari/samza-1.0.0-rc1/

The release candidate is signed with pgp key 6585B3D7, which can be found
on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7

The git tag is release-1.0.0-rc1 and signed with the same pgp key:
https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc1

Test binaries have been published to Maven's staging repository, and are
available here:
https://repository.apache.org/content/repositories/orgapachesamza-1052/

The vote will be open for 72 hours (ending at 9:00 PM PST Thursday, 10/25/2018).

Please download the release candidate, check the hashes/signature, build it
and test it, and then please vote:

[ ] +1 approve

[ ] +0 no opinion

[ ] -1 disapprove (and reason why)

For me, I ran check-all.sh, integration tests and verified the SQL console
in samza-tool tgz. So +1 (non-binding) from my side.

Thanks,
Prateek


Re: [VOTE] Apache Samza 1.0.0 RC1

2018-10-23 Thread Prateek Maheshwari
Thanks for verifying Shanthoosh. We'll cancel this vote, fix the error
and create a new RC.

- Prateek
On Mon, Oct 22, 2018 at 10:31 PM santhosh venkat
 wrote:
>
> I tried building the release candidate(RC1) and it fails with the following
> checkstyle errors.
>
> [ant:checkstyle]
> /Users/svenkata/Documents/apache-samza-1.0.0-src/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java:43:
> 'member def type' have incorrect indentation level 5, expected level should
> be 4.
> [ant:checkstyle]
> /Users/svenkata/Documents/apache-samza-1.0.0-src/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java:43:
> 'method def' child have incorrect indentation level 5, expected level
> should be 4.
> [ant:checkstyle]
> /Users/svenkata/Documents/apache-samza-1.0.0-src/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java:44:
> 'method def' child have incorrect indentation level 5, expected level
> should be 4.
>
> Thanks.
>
> On Mon, Oct 22, 2018 at 9:09 PM Prateek Maheshwari 
> wrote:
>
> > Hi all,
> >
> > This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
> > everyone who has contributed to this release.
> >
> > The release candidate can be downloaded from here:
> > http://home.apache.org/~pmaheshwari/samza-1.0.0-rc1/
> >
> > The release candidate is signed with pgp key 6585B3D7, which can be found
> > on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7
> >
> > The git tag is release-1.0.0-rc1 and signed with the same pgp key:
> >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc1
> >
> > Test binaries have been published to Maven's staging repository, and are
> > available here:
> > https://repository.apache.org/content/repositories/orgapachesamza-1052/
> >
> > The vote will be open for 72 hours (ending at 9:00 PM PST Thursday,
> > 10/25/2018).
> >
> > Please download the release candidate, check the hashes/signature, build it
> > and test it, and then please vote:
> >
> > [ ] +1 approve
> >
> > [ ] +0 no opinion
> >
> > [ ] -1 disapprove (and reason why)
> >
> > For me, I ran check-all.sh, integration tests and verified the SQL console
> > in samza-tool tgz. So +1 (non-binding) from my side.
> >
> > Thanks,
> > Prateek
> >


[CANCEL] [VOTE] Apache Samza 1.0.0 RC1

2018-10-23 Thread Prateek Maheshwari
Hi all,

This is the CANCEL notification for the 1.0.0 RC1. We found a
checkstyle issue that we will fix in the new RC.

Thanks,
Prateek


[VOTE] Apache Samza 1.0.0 RC2

2018-10-23 Thread Prateek Maheshwari
Hi all,

This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
everyone who has contributed to this release.

The release candidate can be downloaded from here:
http://home.apache.org/~pmaheshwari/samza-1.0.0-rc2/

The release candidate is signed with pgp key 6585B3D7, which can be found
on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7

The git tag is release-1.0.0-rc2 and signed with the same pgp key:
https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc2

Test binaries have been published to Maven's staging repository, and are
available here:
https://repository.apache.org/content/repositories/orgapachesamza-1053/

The vote will be open for 72 hours (ending at 12:00 PM PST Friday, 10/26/2018).

Please download the release candidate, check the hashes/signature, build it
and test it, and then please vote:

[ ] +1 approve

[ ] +0 no opinion

[ ] -1 disapprove (and reason why)

For me, I ran check-all.sh, integration tests and verified the SQL console
in samza-tool tgz. So +1 (non-binding) from my side.

Thanks,
Prateek


Re: [VOTE] Apache Samza 1.0.0 RC2

2018-10-24 Thread Prateek Maheshwari
Hi Jagadish,

PR 755 is mis-titled. Its only adding back the tests for the old
consumer. The old consumer was already added back in
https://github.com/apache/samza/pull/740.

Thanks,
Prateek
On Wed, Oct 24, 2018 at 12:02 AM Jagadish Venkatraman
 wrote:
>
> Boris,
>
> Do users have the option to switch to use the "old" Kafka consumer if they
> encounter any issue with the "new" consumer?. If not, should we pull in
> https://github.com/apache/samza/pull/755? It is my understanding that
> PR-755 adds support for this.
>
> Thanks,
> Jagadish
>
> On Tue, Oct 23, 2018 at 2:50 PM Boris S  wrote:
>
> > Ran build, test and integration test on Linux.
> > Verified the signatures.
> >
> > +1
> >
> > On Tue, Oct 23, 2018 at 11:55 AM Prateek Maheshwari 
> > wrote:
> >
> > > Hi all,
> > >
> > > This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
> > > everyone who has contributed to this release.
> > >
> > > The release candidate can be downloaded from here:
> > > http://home.apache.org/~pmaheshwari/samza-1.0.0-rc2/
> > >
> > > The release candidate is signed with pgp key 6585B3D7, which can be found
> > > on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7
> > >
> > > The git tag is release-1.0.0-rc2 and signed with the same pgp key:
> > >
> > >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc2
> > >
> > > Test binaries have been published to Maven's staging repository, and are
> > > available here:
> > > https://repository.apache.org/content/repositories/orgapachesamza-1053/
> > >
> > > The vote will be open for 72 hours (ending at 12:00 PM PST Friday,
> > > 10/26/2018).
> > >
> > > Please download the release candidate, check the hashes/signature, build
> > it
> > > and test it, and then please vote:
> > >
> > > [ ] +1 approve
> > >
> > > [ ] +0 no opinion
> > >
> > > [ ] -1 disapprove (and reason why)
> > >
> > > For me, I ran check-all.sh, integration tests and verified the SQL
> > console
> > > in samza-tool tgz. So +1 (non-binding) from my side.
> > >
> > > Thanks,
> > > Prateek
> > >
> >
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


Re: [VOTE] Apache Samza 1.0.0 RC2

2018-10-25 Thread Prateek Maheshwari
We found another issue that affects message serialization in the test
framework. We will cancel this vote, fix the issue and create another
RC soon.

Thanks,
Prateek

On Thu, Oct 25, 2018 at 12:38 AM santhosh venkat
 wrote:
>
> 1. ./bin/check-all.sh succeeded.
> 2. Both the commands ./bin/integration-tests.sh yarn-integration-tests and
> ./bin/integration-tests.sh standalone-integration-tests succeeded.
> 3. Verified the SQL console available in samza-tool tgz.
>
> +1
>
> Thanks.
>
> On Wed, Oct 24, 2018 at 10:12 PM Yi Pan  wrote:
>
> > Ran check-all and deployed locally with the test jobs. All tests passed.
> >
> > +1 (binding) from my end.
> >
> > Thanks for push the release!
> >
> > -Yi
> >
> > On Wed, Oct 24, 2018 at 8:53 AM Prateek Maheshwari 
> > wrote:
> >
> > > Hi Jagadish,
> > >
> > > PR 755 is mis-titled. Its only adding back the tests for the old
> > > consumer. The old consumer was already added back in
> > > https://github.com/apache/samza/pull/740.
> > >
> > > Thanks,
> > > Prateek
> > > On Wed, Oct 24, 2018 at 12:02 AM Jagadish Venkatraman
> > >  wrote:
> > > >
> > > > Boris,
> > > >
> > > > Do users have the option to switch to use the "old" Kafka consumer if
> > > they
> > > > encounter any issue with the "new" consumer?. If not, should we pull in
> > > > https://github.com/apache/samza/pull/755? It is my understanding that
> > > > PR-755 adds support for this.
> > > >
> > > > Thanks,
> > > > Jagadish
> > > >
> > > > On Tue, Oct 23, 2018 at 2:50 PM Boris S  wrote:
> > > >
> > > > > Ran build, test and integration test on Linux.
> > > > > Verified the signatures.
> > > > >
> > > > > +1
> > > > >
> > > > > On Tue, Oct 23, 2018 at 11:55 AM Prateek Maheshwari <
> > > prate...@utexas.edu>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > This is a call for a vote on a release of Apache Samza 1.0.0.
> > Thanks
> > > to
> > > > > > everyone who has contributed to this release.
> > > > > >
> > > > > > The release candidate can be downloaded from here:
> > > > > > http://home.apache.org/~pmaheshwari/samza-1.0.0-rc2/
> > > > > >
> > > > > > The release candidate is signed with pgp key 6585B3D7, which can be
> > > found
> > > > > > on keyservers:
> > > https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7
> > > > > >
> > > > > > The git tag is release-1.0.0-rc2 and signed with the same pgp key:
> > > > > >
> > > > > >
> > > > >
> > >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc2
> > > > > >
> > > > > > Test binaries have been published to Maven's staging repository,
> > and
> > > are
> > > > > > available here:
> > > > > >
> > > https://repository.apache.org/content/repositories/orgapachesamza-1053/
> > > > > >
> > > > > > The vote will be open for 72 hours (ending at 12:00 PM PST Friday,
> > > > > > 10/26/2018).
> > > > > >
> > > > > > Please download the release candidate, check the hashes/signature,
> > > build
> > > > > it
> > > > > > and test it, and then please vote:
> > > > > >
> > > > > > [ ] +1 approve
> > > > > >
> > > > > > [ ] +0 no opinion
> > > > > >
> > > > > > [ ] -1 disapprove (and reason why)
> > > > > >
> > > > > > For me, I ran check-all.sh, integration tests and verified the SQL
> > > > > console
> > > > > > in samza-tool tgz. So +1 (non-binding) from my side.
> > > > > >
> > > > > > Thanks,
> > > > > > Prateek
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Jagadish V,
> > > > Graduate Student,
> > > > Department of Computer Science,
> > > > Stanford University
> > >
> >


[CANCEL] [VOTE] Apache Samza 1.0.0 RC2

2018-10-25 Thread Prateek Maheshwari
Hi all,

This is the CANCEL notification for the 1.0.0 RC2. We found a
test framework message serialization issue that we will fix in the new RC.

Thanks,
Prateek


[VOTE] Apache Samza 1.0.0 RC3

2018-10-29 Thread Prateek Maheshwari
Hi all,

This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
everyone who has contributed to this release.

The release candidate can be downloaded from here:
http://home.apache.org/~pmaheshwari/samza-1.0.0-rc3/

The release candidate is signed with pgp key 6585B3D7, which can be found
on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7

The git tag is release-1.0.0-rc3 and signed with the same pgp key:
https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc3

Test binaries have been published to Maven's staging repository, and are
available here:
https://repository.apache.org/content/repositories/orgapachesamza-1054/

The vote will be open for 72 hours (ending at 10:00 PM PST Thursday,
11/01/2018).

Please download the release candidate, check the hashes/signature, build it
and test it, and then please vote:

[ ] +1 approve

[ ] +0 no opinion

[ ] -1 disapprove (and reason why)

For me, I ran check-all.sh, integration tests and verified the SQL console
in samza-tool tgz. So +1 (non-binding) from my side.

Thanks,
Prateek


Re: [VOTE] Apache Samza 1.0.0 RC3

2018-10-30 Thread Prateek Maheshwari
We found an issue with Samza SQL integration with the new
ApplicationRunners APIs. We'll cancel this vote and create a new RC.

Thanks,
Prateek
On Tue, Oct 30, 2018 at 10:14 AM Jake Maes  wrote:
>
> +1 binding
>
> Ran check-all on OSX with Gradle 2.8
>
> On Mon, Oct 29, 2018 at 10:13 PM Prateek Maheshwari 
> wrote:
>
> > Hi all,
> >
> > This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
> > everyone who has contributed to this release.
> >
> > The release candidate can be downloaded from here:
> > http://home.apache.org/~pmaheshwari/samza-1.0.0-rc3/
> >
> > The release candidate is signed with pgp key 6585B3D7, which can be found
> > on keyservers: https://pgp.mit.edu/pks/lookup?op=get&search=0x6585B3D7
> >
> > The git tag is release-1.0.0-rc3 and signed with the same pgp key:
> >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc3
> >
> > Test binaries have been published to Maven's staging repository, and are
> > available here:
> > https://repository.apache.org/content/repositories/orgapachesamza-1054/
> >
> > The vote will be open for 72 hours (ending at 10:00 PM PST Thursday,
> > 11/01/2018).
> >
> > Please download the release candidate, check the hashes/signature, build it
> > and test it, and then please vote:
> >
> > [ ] +1 approve
> >
> > [ ] +0 no opinion
> >
> > [ ] -1 disapprove (and reason why)
> >
> > For me, I ran check-all.sh, integration tests and verified the SQL console
> > in samza-tool tgz. So +1 (non-binding) from my side.
> >
> > Thanks,
> > Prateek
> >


[CANCEL] [VOTE] Apache Samza 1.0.0 RC2

2018-10-30 Thread Prateek Maheshwari
Hi all,

This is the CANCEL notification for the 1.0.0 RC3. We found an issue
with Samza SQL integration with the new ApplicationRunners API that we
will fix in the new RC.

Thanks,
Prateek


Re: [VOTE] Apache Samza 1.0.0 RC4

2018-11-02 Thread Prateek Maheshwari
Verified signatures and successfully ran check-all and integration tests.

+1 (binding) from me.

Thanks,
Prateek

On Fri, Nov 2, 2018 at 2:39 PM Boris S  wrote:
>
> ran check-all and integration tests. All passed.
> verified signatures.
> +1
>
> On Wed, Oct 31, 2018 at 7:15 PM Jagadish Venkatraman 
> wrote:
>
> > Hi all,
> >
> > This is a call for a vote on a release of Apache Samza 1.0.0. Thanks to
> > everyone who has contributed to this release.
> >
> > The release candidate can be downloaded from here:
> > http://home.apache.org/~jagadish/samza-1.0.0-rc4/
> >
> > The release candidate is signed with pgp key AF81FFBF, which can be found
> > on keyservers:
> > http://pgp.mit.edu/pks/lookup?op=get&search=0xAF81FFBF
> >
> > The git tag is release-1.0.0-rc4 and signed with the same pgp key:
> >
> > https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.0.0-rc4
> >
> > Test binaries have been published to Maven's staging repository, and are
> > available here:
> > https://repository.apache.org/content/repositories/orgapachesamza-1055/
> >
> > The vote will be open for 72 hours (ending at 7:00 PM PST Saturday,
> > November 3).
> >
> > Please download the release candidate, check the hashes/signature, build it
> > and test it, and then please vote:
> >
> > [ ] +1 approve
> >
> > [ ] +0 no opinion
> >
> > [ ] -1 disapprove (and reason why)
> >
> > For me, I ran check-all.sh, integration tests and verified the SQL console
> > in samza-tool tgz. So +1 (binding) from my side.
> >
> > Thanks,
> > Jagadish
> >
> > --
> > Jagadish V
> >


Re: Welcome Hai Lu and Aditya Toomla as committers to Apache Samza!

2018-11-06 Thread Prateek Maheshwari
Congrats Hai and Aditya, and thanks for your contributions!

- Prateek

> On Nov 6, 2018, at 10:40 AM, Wei Song  wrote:
> 
> Congrats Hai and Aditya!
> 
> 
> 
> On 11/6/18, 10:20 AM, "Yi Pan"  wrote:
> 
>Hi, all,
> 
>All official steps are completed and please join me to welcome Hai and
>Aditya to Apache Samza community as committers! They have been making
>significant contribution to many important projects in Samza such as SQL,
>Samza-on-Hadoop, Kinesis connector, etc.
> 
>Welcome Hai and Aditya!
> 
>-Yi
> 
> 



Re: https://issues.apache.org/jira/browse/SAMZA-2039

2018-12-18 Thread Prateek Maheshwari
For notifying others, you can leave a comment on the ticket that you're working 
on it. Additionally, you can assign the ticket to yourself if you have the 
permissions to do so. 

Thanks for your interest, and please let us know if you need any help.

- Prateek

> On Dec 17, 2018, at 8:27 PM, blitzerr  wrote:
> 
> What is the protocol to work on an issue ? Do I claim it first ? How do we
> make sure multiple people are starting on the same one ?



Re: app.class or task.class for beam samza runner

2019-01-03 Thread Prateek Maheshwari
Hi Omkar,

I think it's only possible to get that exception with Samza 1.0. Can
you verify that the deployment is indeed using samza 0.14.1?

Thanks,
Prateek

On Wed, Jan 2, 2019 at 11:40 PM Deshpande, Omkar
 wrote:
>
> Hello,
>
> I have been able to execute my Samza-Beam application in Local mode. And now 
> I am trying to run a Samza-Beam application in Standalone mode.
>
> Here is my configFile  config.properties:
>
> app.name=test-app
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> job.coordinator.zk.connect=localhost:2181
> job.coordinator.system=kafka
> job.factory.class=org.apache.samza.job.local.ProcessJobFactory
> # Kafka System
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
> systems.kafka.default.stream.replication.factor=1
>
> I am getting following exception:
>
> org.apache.samza.config.ConfigException: Legacy task applications must set a 
> non-empty task.class in configuration.
>
>   at 
> org.apache.samza.application.ApplicationUtil.fromConfig(ApplicationUtil.java:58)
>
>   at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:87)
>
> Versions:
> 2.9.0
> 0.14.1
>
> As per my understanding, I shouldn’t have to create implementation of 
> StreamApplication or StreamTask while using Beam SDK.
>
> An example of configFile for Samza-Beam Standalone application would be 
> helpful.
>
> Regards,
> Omkar Deshpande


Re: Beam Samza Runner - java.lang.UnsupportedOperationException: Cannot create a producer for an input system

2019-01-07 Thread Prateek Maheshwari
+ Xinyu

> On Jan 4, 2019, at 9:58 PM, Deshpande, Omkar  
> wrote:
> 
> Hello,
> 
> I am getting following exception while running Beam Samza Runner –
> 
> java.lang.UnsupportedOperationException: Cannot create a producer for an 
> input system
> 
>  at 
> org.apache.beam.runners.samza.adapter.BoundedSourceSystem$Factory.getProducer(BoundedSourceSystem.java:411)
> 
>  at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:223)
> 
>  at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:220)
> 
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>  at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> 
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>  at 
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:220)
> 
>  at org.apache.samza.container.SamzaContainer.apply(SamzaContainer.scala)
> 
>  at 
> org.apache.samza.processor.StreamProcessor.createSamzaContainer(StreamProcessor.java:198)
> 
>  at 
> org.apache.samza.processor.StreamProcessor$1.onNewJobModel(StreamProcessor.java:290)
> 
>  at 
> org.apache.samza.zk.ZkJobCoordinator.onNewJobModelConfirmed(ZkJobCoordinator.java:304)
> 
>  at 
> org.apache.samza.zk.ZkJobCoordinator$ZkBarrierListenerImpl.lambda$onBarrierStateChanged$1(ZkJobCoordinator.java:394)
> 
>  at 
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163)
> 
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 
>  at java.lang.Thread.run(Thread.java:748)
> 
> 2019-01-04 21:39:11 ERROR SamzaContainer$:86 - Failed to create a producer 
> for 0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, so skipping.
> 
> java.lang.UnsupportedOperationException: Cannot create a producer for an 
> input system
> 
>  at 
> org.apache.beam.runners.samza.adapter.UnboundedSourceSystem$Factory.getProducer(UnboundedSourceSystem.java:452)
> 
>  at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:223)
> 
>  at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:220)
> 
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>  at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> 
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>  at 
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:220)
> 
>  at org.apache.samza.container.SamzaContainer.apply(SamzaContainer.scala)
> 
>  at 
> org.apache.samza.processor.StreamProcessor.createSamzaContainer(StreamProcessor.java:198)
> 
>  at 
> org.apache.samza.processor.StreamProcessor$1.onNewJobModel(StreamProcessor.java:290)
> 
>  at 
> org.apache.samza.zk.ZkJobCoordinator.onNewJobModelConfirmed(ZkJobCoordinator.java:304)
> 
>  at 
> org.apache.samza.zk.ZkJobCoordinator$ZkBarrierListenerImpl.lambda$onBarrierStateChanged$1(ZkJobCoordinator.java:394)
> 
>  at 
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163)
> 
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 
>  at java.lang.Thread.run(Thread.java:748)
> 
> This exception does not stop the execution, however I would like to 
> understand the reason for this and possible resolution.
> 
> Thanks,
> Omkar Deshpande



Re: Draft report to board - Jan 2019

2019-01-09 Thread Prateek Maheshwari
Thanks for the summary Yi. I'd change: "HDFS based backup/restore of
state stores" to "Evaluation for HDFS based backup/restore of state
stores" since this was an intern project and is not checked in to
master. Otherwise LGTM.

Thanks,
Prateek

On Wed, Jan 9, 2019 at 12:28 PM Yi Pan  wrote:
>
> Hi, all,
>
> Our quarterly report is due this Wed (1/9). The following is the draft
> report. Please let me know by the end of the day if I missed anything.
> Thanks!
>
> ## Description:
>
>  - Apache Samza is a distributed stream processing engine that are highly
>
>configurable to process events from various data sources, including
>
>real-time messaging system (e.g. Kafka) and distributed file systems
> (e.g.
>
>HDFS).
>
>
>
> ## Issues:
>
>  - No issues requires board attention
>
>
>
> ## Activity:
>
>  - Samza 1.0 is released:
>
> - News coverage:
> https://www.zdnet.com/article/real-time-data-processing-just-got-more-options-linkedin-releases-apache-samza-1-0-streaming/
>
> - Engineering blogs:
> https://engineering.linkedin.com/blog/2018/11/samza-1-0--stream-processing-at-massive-scale
>
> - Major online website refresh: http://samza.apache.org/
>
>  - Critical improvement projects completed:
>
> - Changelog restore parallelization
>
> - HDFS based backup/restore of state stores
>
>  - Multiple SEP projects initiated or in-progress:
>
> - SEP-18: allows manipulating starting offsets and time-based rewind
>
> - SEP-19: Fast failover for stateful jobs on container failure (i.e.
> standby container)
>
> - SEP to come soon: async high-level API
>
>  - Beam Samza runner upgrade to use Samza 1.0
>
>  - Go and Python support via Beam Samza runner
>
>
>
> ## Health report:
>
>  - Project is in healthy status with 1.0 released in Nov 2018
>
>
>
> ## PMC changes:
>
>
>
>  - Currently 15 PMC members.
>
>  - Prateek Maheshwari was added to the PMC on Thu Nov 01 2018
>
>
>
> ## Committer base changes:
>
>
>
>  - Currently 22 committers.
>
>  - New commmitters:
>
> - Aditya Toomula was added as a committer on Mon Nov 05 2018
>
> - Hai Lu was added as a committer on Mon Nov 05 2018
>
>
>
> ## Releases:
>
>
>
>  - Last release was 1.0 on Nov 28, 2018
>
>
>
> ## /dist/ errors: 9
>
>  - Project is in healthy status with 1.0 released in Nov 2018
>
>
>
> ## Mailing list activity:
>
>
>
>  - dev@samza.apache.org:
>
> - 271 subscribers (down -13 in the last 3 months):
>
> - 445 emails sent to list (288 in previous quarter)
>
>
>
>
>
> ## JIRA activity:
>
>
>
>  - 111 JIRA tickets created in the last 3 months
>
>  - 57 JIRA tickets closed/resolved in the last 3 months


Re: [DISCUSS] Mandatory migration of Samza git repo to gitbox.apache.org

2019-01-15 Thread Prateek Maheshwari
Thanks for starting the discussion Pawas. I'm +1 (binding) for the migration.

- Prateek

On Tue, Jan 15, 2019 at 11:44 AM Pawas Chhokra  wrote:
>
> Hi all,
>
> As mandated by the Apache Infrastructure Team, all git repositories must be
> migrated from git-wip-us.apache.org URL to gitbox.apache.org, as the old
> service is being decommissioned. This needs to happen before February 7th,
> and this ticket  is to
> check if migrating Samza on 11 AM, Jan 25, 2019 is acceptable to everyone.
>
> Thanks & Regards,
> Pawas Chhokra


Re: [VOTE] Migration of Samza git repo to gitbox.apache.org

2019-01-23 Thread Prateek Maheshwari
+1 (binding) again

- Prateek

On Wed, Jan 23, 2019 at 11:50 AM Pawas Chhokra  wrote:
>
> Hi all,
>
> This is a call for a vote on migrating Samza git repo to gitbox.apache.org, on
> 11 AM, Jan 29, 2019. As mandated by the Apache Infrastructure Team, all git
> repositories must be migrated from git-wip-us.apache.org URL to
> gitbox.apache.org, as the old service is being decommissioned.
> The vote will be open for 72 hours (ending at 12:00 PM PST Monday,
> January 28). You can vote as follows:
>
> [ ] +1 approve
>
> [ ] +0 no opinion
>
> [ ] -1 disapprove (and reason why)
>
> The vote is +1 from my side.
>
> Thanks & Regards,
> Pawas Chhokra


Re: "send to" ordering is inconsistent

2019-02-27 Thread Prateek Maheshwari
Hi Tom,

I'm assuming that the two sub-DAGs you're talking about are the two Map ->
Send To chains acting on the "audit-report-requests" input and sending
their results to the "audit-report-status" output.

Although processing within each Task is in-order, the framework does not
guarantee the order in which the multiple chained operators for an operator
are evaluated. Specifically, in the current implementation
,
an Operator's registeredOperators are maintained as a HashSet of
OperatorImpls. This would explain the out-of-order appearance of the two
messages. I'm not sure what's changed in 1.0 that makes this trigger now.

Since both sendTo and sink are terminal operators (void return type), I
don't think you'll be able to easily get around this. Let me discuss this
with the team and get back to you with a workaround / fix.

Thanks,
Prateek


On Tue, Feb 26, 2019 at 7:08 PM Tom Davis  wrote:

> Hey folks!
>
> We have noticed some inconsistencies in message ordering when running a
> StreamApplication that calls two separate `map` functions over an input
> and sends results to the same output. I have attached my Execution Plan,
> but the gist is that the first `map` function marks a thing as "pending"
> by sending a message to a status topic and the second `map` function
> does some work then sends its own status with "done".
>
> We have a test set up to read the resulting status topic with a normal
> Kafka consumer to ensure that two status messages were produced by Samza
> and consumed in the proper order (first "pending", then "done", per the
> order of the MessageStream call chains). This test flaps pretty
> routinely since upgrading to Samza 1.0; we never noticed this in the
> past. Sometimes, it times out waiting for any messages, though that's
> considerably less rare than the ordering issue. My understanding is, for
> a given Task, by default, all processing should be done serially. Is
> that no longer true? Is the guarantee *only* for the order in which
> messages are consumed, not produced?
>
> For test simplicity, there's a single Kafka partition for each topic and
> I attempted to create a configuration file that would eliminate as much
> coordination and concurrency sources as I knew how:
>
>   processor.id=0
>
> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
>   job.container.single.thread.mode=true
>
> (We use the ZkJobCoordinatorFactory normally but both produce the bug)
>
> I realize the KafkaProducer does not *technically* guarantee delivery
> order except when using transactions, which KafkaSystemProducer doesn't
> appear to do by default. I have checked the actual message envelope and
> when the ordering is wrong, the offset order is correct -- so, "done"
> was recorded by Kafka prior to "pending". This seems to rule out Samza
> but I'm not entirely confident in that conclusion. Any thoughts?
>
> Thanks,
>
> Tom
>


Re: "send to" ordering is inconsistent

2019-02-28 Thread Prateek Maheshwari
Hi Tom,

Thanks for reporting this. I created a ticket (SAMZA-2116
<https://issues.apache.org/jira/browse/SAMZA-2116>) to make the required
API changes. We'll include this in the next Samza release, which should be
mid to late next month.

In the mean time, the workaround would be to keep all of this functionality
in a sink function. Does this work for you?

Thanks,
Prateek

On Wed, Feb 27, 2019 at 2:54 PM Tom Davis  wrote:

>
> Prateek Maheshwari  writes:
>
> > Hi Tom,
> >
> > I'm assuming that the two sub-DAGs you're talking about are the two Map
> ->
> > Send To chains acting on the "audit-report-requests" input and sending
> > their results to the "audit-report-status" output.
> >
>
> Yes, that's correct.
>
> >
> > Although processing within each Task is in-order, the framework does not
> > guarantee the order in which the multiple chained operators for an
> operator
> > are evaluated. Specifically, in the current implementation
> > <
> https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106
> >,
> > an Operator's registeredOperators are maintained as a HashSet of
> > OperatorImpls. This would explain the out-of-order appearance of the two
> > messages. I'm not sure what's changed in 1.0 that makes this trigger now.
> >
>
> Ah! I thought this was the case but I couldn't find the part of the code
> to prove it. This makes far more sense than Kafka routinely not
> committing messages in order (though it is still technically a
> possibility).
>
> Upon further investigation, I'm not convinced it's a 1.0 issue; I think
> we just started using multiple chained operators more heavily.
>
> >
> > Since both sendTo and sink are terminal operators (void return type), I
> > don't think you'll be able to easily get around this. Let me discuss this
> > with the team and get back to you with a workaround / fix.
> >
>
> Thanks a lot! <3
>
> >
> > Thanks,
> > Prateek
> >
> >
> > On Tue, Feb 26, 2019 at 7:08 PM Tom Davis 
> wrote:
> >
> >> Hey folks!
> >>
> >> We have noticed some inconsistencies in message ordering when running a
> >> StreamApplication that calls two separate `map` functions over an input
> >> and sends results to the same output. I have attached my Execution Plan,
> >> but the gist is that the first `map` function marks a thing as "pending"
> >> by sending a message to a status topic and the second `map` function
> >> does some work then sends its own status with "done".
> >>
> >> We have a test set up to read the resulting status topic with a normal
> >> Kafka consumer to ensure that two status messages were produced by Samza
> >> and consumed in the proper order (first "pending", then "done", per the
> >> order of the MessageStream call chains). This test flaps pretty
> >> routinely since upgrading to Samza 1.0; we never noticed this in the
> >> past. Sometimes, it times out waiting for any messages, though that's
> >> considerably less rare than the ordering issue. My understanding is, for
> >> a given Task, by default, all processing should be done serially. Is
> >> that no longer true? Is the guarantee *only* for the order in which
> >> messages are consumed, not produced?
> >>
> >> For test simplicity, there's a single Kafka partition for each topic and
> >> I attempted to create a configuration file that would eliminate as much
> >> coordination and concurrency sources as I knew how:
> >>
> >>   processor.id=0
> >>
> >>
> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
> >>   job.container.single.thread.mode=true
> >>
> >> (We use the ZkJobCoordinatorFactory normally but both produce the bug)
> >>
> >> I realize the KafkaProducer does not *technically* guarantee delivery
> >> order except when using transactions, which KafkaSystemProducer doesn't
> >> appear to do by default. I have checked the actual message envelope and
> >> when the ordering is wrong, the offset order is correct -- so, "done"
> >> was recorded by Kafka prior to "pending". This seems to rule out Samza
> >> but I'm not entirely confident in that conclusion. Any thoughts?
> >>
> >> Thanks,
> >>
> >> Tom
> >>
>


Re: [POSSIBLE PHISHING] Task Partition Commit Failed After Upgrade

2019-03-06 Thread Prateek Maheshwari
Hi Jeremiah,

The configuration you want to look for is:
'job.systemstreampartition.grouper.factory'. It should default to:
'org.apache.samza.container.grouper.stream.GroupByPartitionFactory'.
Can you check if you see this value in the configuration logged by
SamzaContainer during container start? You can grep for: "Using
configuration".

For context, there are two groupers for a Samza job. One that groups input
partitions into tasks (this one), and one that groups tasks into containers
(the one you mentioned above).

Thanks,
Prateek



On Wed, Mar 6, 2019 at 8:14 AM Jeremiah Adams 
wrote:

> It appears that the issue is related to the KafkaCheckpointLogKey.java
> constructor. grouperFactoryClassName here is null.  THe documentation
> indicates that task.name.grouper.factory config setting has a default value
> of
>  org.apache.samza.container.grouper.task.GroupByContainerCountFactory. I
> wouldn't expect it to be null here.
>
> If I specify GroupByContainerCountFactory for the
> task.name.grouper.factory in my properties file, I get a
> NoSuchMethodException:
>
> Exception in thread "main" java.lang.InstantiationException:
> org.apache.samza.container.grouper.task.GroupByContainerCount
> at java.lang.Class.newInstance(Class.java:427)
> at org.apache.samza.util.Util$.getObj(Util.scala:80)
> at
> org.apache.samza.coordinator.JobModelManager$.readJobModel(JobModelManager.scala:261)
> at
> org.apache.samza.coordinator.JobModelManager$.getJobModelManager(JobModelManager.scala:155)
> at
> org.apache.samza.coordinator.JobModelManager$.apply(JobModelManager.scala:117)
> at
> org.apache.samza.coordinator.JobModelManager.apply(JobModelManager.scala)
> at
> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.buildJobModelManager(ClusterBasedJobCoordinator.java:241)
> at
> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.(ClusterBasedJobCoordinator.java:152)
> at
> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(ClusterBasedJobCoordinator.java:297)
> Caused by: java.lang.NoSuchMethodException:
> org.apache.samza.container.grouper.task.GroupByContainerCount.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.newInstance(Class.java:412)
> ... 8 more
>
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com 
> Blog  | Twitter <
> https://twitter.com/HelixEducation> | Facebook <
> https://www.facebook.com/HelixEducation> | LinkedIn <
> http://www.linkedin.com/company/3609946>
>
>
> On 3/4/19, 2:48 PM, "Jeremiah Adams"  wrote:
>
> I am updating dependencies and moving from Samza V0.13.0 to V0.14.0.
> I develop locally using the grid app in the hello-samza project to spin up
> local yarn/zookeeper/kafka instances.
>
> Grid is running these versions:
> kafka_2.11-0.10.2.1.tgz
> hadoop-2.6.1.tar.gz
> zookeeper-3.4.3.tar.gz
>
>
> My job is now failing with the NPE below. anyone have ideas on the
> cause of this error?
>
>
> 2019-03-04 14:13:49 AsyncRunLoop [ERROR] Task Partition 0 commit failed
> java.lang.NullPointerException
> at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:782)
> at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey.(KafkaCheckpointLogKey.java:46)
> at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.writeCheckpoint(KafkaCheckpointManager.scala:136)
> at
> org.apache.samza.checkpoint.OffsetManager.writeCheckpoint(OffsetManager.scala:259)
> at
> org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:205)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker$5.run(AsyncRunLoop.java:494)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.commit(AsyncRunLoop.java:513)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:379)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:314)
> at
> org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:228)
> at
> org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:157)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:728)
> at
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:102)
> at
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:147)
> 2019-03-04 14:13:49 AsyncRunLoop [ERROR] Caught throwable and stopping
> run loop
>
>
>
> Jeremiah Adams
> Software Engineer
>
> https://url.emailprotection.link/?ahfhEufaAWbezBrUFPG98ZJcterGfIerU3ZwsA3Gv_C0~
> <
> https://url.emailprotection.link/?a49H2rNGIIBtQOw6md8OcHp-qKE3Xn2gNiZ3dlqAeSDA~
> >
> Blog<
> https://url.emailprotection.link/?a49H2rNGIIBtQOw6md8OcHgFEZu-KYuiu

Re: [DISCUSS] Samza 1.1.0 release

2019-03-07 Thread Prateek Maheshwari
Daniel, let's try to include the following change in the release as well.
SAMZA-2116: Make sendTo and sink operators non-terminal

Other than that, +1 (binding).

- Prateek



On Thu, Mar 7, 2019 at 9:22 AM Xinyu Liu  wrote:

> +1 (binding)
>
> Thanks,
> Xinyu
>
> On Thu, Mar 7, 2019 at 12:43 AM santhosh venkat <
> santhoshvenkat1...@gmail.com> wrote:
>
> > +1 (non-binding)
> >
> > Thanks,
> >
> > On Wed, Mar 6, 2019 at 10:42 PM Yi Pan  wrote:
> >
> > > +1 (binding)
> > >
> > > On Wed, Mar 6, 2019 at 10:08 PM Daniel Chen  wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > We have added couple of major features to master since 1.0.0 that
> > > warrants
> > > > a major release.
> > > >
> > > > Within LinkedIn, some of these features have already been tested as
> > part
> > > of
> > > > our test suites. We plan to continue our testing in coming weeks to
> > > > validate the stability prior to release.
> > > >
> > > > Here is the highlighted list of features that are part of the new
> > release
> > > > (in chronological order)
> > > > SAMZA-1981
> > > > Consolidate table descriptors to samza-api
> > > > SAMZA-1985
> > > > Implement Startpoints model and StartpointManager
> > > > SAMZA-1998
> > > > Table API refactoring
> > > > SAMZA-2012
> > > > Add API for wiring an external context through to application
> > processing
> > > > code
> > > > SAMZA-2041
> > > > Add system descriptors for HDFS and Kinesis
> > > > SAMZA-2043
> > > > Consolidate ReadableTable and ReadWriteTable
> > > > SAMZA-2106
> > > > Samza App & Job Config Refactor
> > > > SAMZA-2081
> > > > Samza SQL : Type system for Samza SQL
> > > >
> > > > You can find a complete list of features here:
> > > >
> > > >
> > >
> >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fissues%2F%3Fjql%3Dproject%2520%253D%2520SAMZA%2520AND%2520resolution%2520%2520%253D%2520Fixed%2520%2520AND%2520(fixVersion%2520%253E%253D%25201.1%2520)%2520ORDER%2520BY%2520createdDate%2520%2520DESC&data=02%7C01%7Cdchen1%40linkedin.com%7C01251a7438ea4324f3f608d6a2c11a53%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636875347611087937&sdata=ZDMaQj5vX6Vlm%2B8vpGhrNygxpI2vvNnYGi1USWe%2FD5A%3D&reserved=0
> > > >
> > > > Here is my proposal on our release schedule and timelines.
> > > >
> > > >1. Cut a release version 1.1.0 from master
> > > >2. Target a release vote on the week March 13th (next week)
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Daniel
> > > >
> > >
> >
>


Re: [POSSIBLE PHISHING] Task Partition Commit Failed After Upgrade

2019-03-07 Thread Prateek Maheshwari
Jeremiah, were you able to resolve this issue?

- Prateek

On Wed, Mar 6, 2019 at 10:08 AM Prateek Maheshwari 
wrote:

> Hi Jeremiah,
>
> The configuration you want to look for is:
> 'job.systemstreampartition.grouper.factory'. It should default to:
> 'org.apache.samza.container.grouper.stream.GroupByPartitionFactory'.
> Can you check if you see this value in the configuration logged by
> SamzaContainer during container start? You can grep for: "Using
> configuration".
>
> For context, there are two groupers for a Samza job. One that groups input
> partitions into tasks (this one), and one that groups tasks into containers
> (the one you mentioned above).
>
> Thanks,
> Prateek
>
>
>
> On Wed, Mar 6, 2019 at 8:14 AM Jeremiah Adams 
> wrote:
>
>> It appears that the issue is related to the KafkaCheckpointLogKey.java
>> constructor. grouperFactoryClassName here is null.  THe documentation
>> indicates that task.name.grouper.factory config setting has a default value
>> of
>>  org.apache.samza.container.grouper.task.GroupByContainerCountFactory. I
>> wouldn't expect it to be null here.
>>
>> If I specify GroupByContainerCountFactory for the
>> task.name.grouper.factory in my properties file, I get a
>> NoSuchMethodException:
>>
>> Exception in thread "main" java.lang.InstantiationException:
>> org.apache.samza.container.grouper.task.GroupByContainerCount
>> at java.lang.Class.newInstance(Class.java:427)
>> at org.apache.samza.util.Util$.getObj(Util.scala:80)
>> at
>> org.apache.samza.coordinator.JobModelManager$.readJobModel(JobModelManager.scala:261)
>> at
>> org.apache.samza.coordinator.JobModelManager$.getJobModelManager(JobModelManager.scala:155)
>> at
>> org.apache.samza.coordinator.JobModelManager$.apply(JobModelManager.scala:117)
>> at
>> org.apache.samza.coordinator.JobModelManager.apply(JobModelManager.scala)
>> at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.buildJobModelManager(ClusterBasedJobCoordinator.java:241)
>> at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.(ClusterBasedJobCoordinator.java:152)
>> at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(ClusterBasedJobCoordinator.java:297)
>> Caused by: java.lang.NoSuchMethodException:
>> org.apache.samza.container.grouper.task.GroupByContainerCount.()
>> at java.lang.Class.getConstructor0(Class.java:3082)
>> at java.lang.Class.newInstance(Class.java:412)
>> ... 8 more
>>
>>
>>
>> Jeremiah Adams
>> Software Engineer
>> www.helixeducation.com <http://www.helixeducation.com/>
>> Blog <http://www.helixeducation.com/blog/> | Twitter <
>> https://twitter.com/HelixEducation> | Facebook <
>> https://www.facebook.com/HelixEducation> | LinkedIn <
>> http://www.linkedin.com/company/3609946>
>>
>>
>> On 3/4/19, 2:48 PM, "Jeremiah Adams"  wrote:
>>
>> I am updating dependencies and moving from Samza V0.13.0 to V0.14.0.
>> I develop locally using the grid app in the hello-samza project to spin up
>> local yarn/zookeeper/kafka instances.
>>
>> Grid is running these versions:
>> kafka_2.11-0.10.2.1.tgz
>> hadoop-2.6.1.tar.gz
>> zookeeper-3.4.3.tar.gz
>>
>>
>> My job is now failing with the NPE below. anyone have ideas on the
>> cause of this error?
>>
>>
>> 2019-03-04 14:13:49 AsyncRunLoop [ERROR] Task Partition 0 commit
>> failed
>> java.lang.NullPointerException
>> at
>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:782)
>> at
>> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey.(KafkaCheckpointLogKey.java:46)
>> at
>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.writeCheckpoint(KafkaCheckpointManager.scala:136)
>> at
>> org.apache.samza.checkpoint.OffsetManager.writeCheckpoint(OffsetManager.scala:259)
>> at
>> org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:205)
>> at
>> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker$5.run(AsyncRunLoop.java:494)
>> at
>> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.commit(AsyncRunLoop.java:513)
>> at
>> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:379)
>> at
>> org.apache.samza.task.AsyncRunLoop$AsyncTask

Re: "send to" ordering is inconsistent

2019-03-07 Thread Prateek Maheshwari
Hi Tom,

It looks like we won't be able to include SAMZA-2116 in the upcoming 1.1
release due to time constraints. It'll have to go in to the 1.2 release,
which will tentatively be in June. Does that still work for you?

Thanks,
Prateek

On Thu, Feb 28, 2019 at 2:16 PM Tom Davis  wrote:

> Thanks, Prateek! Yes, the workaround will be fine for the time being.
> Thank you again!
>
> Prateek Maheshwari  writes:
>
> > Hi Tom,
> >
> > Thanks for reporting this. I created a ticket (SAMZA-2116
> > <https://issues.apache.org/jira/browse/SAMZA-2116>) to make the required
> > API changes. We'll include this in the next Samza release, which should
> be
> > mid to late next month.
> >
> > In the mean time, the workaround would be to keep all of this
> functionality
> > in a sink function. Does this work for you?
> >
> > Thanks,
> > Prateek
> >
> > On Wed, Feb 27, 2019 at 2:54 PM Tom Davis 
> wrote:
> >
> >>
> >> Prateek Maheshwari  writes:
> >>
> >> > Hi Tom,
> >> >
> >> > I'm assuming that the two sub-DAGs you're talking about are the two
> Map
> >> ->
> >> > Send To chains acting on the "audit-report-requests" input and sending
> >> > their results to the "audit-report-status" output.
> >> >
> >>
> >> Yes, that's correct.
> >>
> >> >
> >> > Although processing within each Task is in-order, the framework does
> not
> >> > guarantee the order in which the multiple chained operators for an
> >> operator
> >> > are evaluated. Specifically, in the current implementation
> >> > <
> >>
> https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106
> >> >,
> >> > an Operator's registeredOperators are maintained as a HashSet of
> >> > OperatorImpls. This would explain the out-of-order appearance of the
> two
> >> > messages. I'm not sure what's changed in 1.0 that makes this trigger
> now.
> >> >
> >>
> >> Ah! I thought this was the case but I couldn't find the part of the code
> >> to prove it. This makes far more sense than Kafka routinely not
> >> committing messages in order (though it is still technically a
> >> possibility).
> >>
> >> Upon further investigation, I'm not convinced it's a 1.0 issue; I think
> >> we just started using multiple chained operators more heavily.
> >>
> >> >
> >> > Since both sendTo and sink are terminal operators (void return type),
> I
> >> > don't think you'll be able to easily get around this. Let me discuss
> this
> >> > with the team and get back to you with a workaround / fix.
> >> >
> >>
> >> Thanks a lot! <3
> >>
> >> >
> >> > Thanks,
> >> > Prateek
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 7:08 PM Tom Davis 
> >> wrote:
> >> >
> >> >> Hey folks!
> >> >>
> >> >> We have noticed some inconsistencies in message ordering when
> running a
> >> >> StreamApplication that calls two separate `map` functions over an
> input
> >> >> and sends results to the same output. I have attached my Execution
> Plan,
> >> >> but the gist is that the first `map` function marks a thing as
> "pending"
> >> >> by sending a message to a status topic and the second `map` function
> >> >> does some work then sends its own status with "done".
> >> >>
> >> >> We have a test set up to read the resulting status topic with a
> normal
> >> >> Kafka consumer to ensure that two status messages were produced by
> Samza
> >> >> and consumed in the proper order (first "pending", then "done", per
> the
> >> >> order of the MessageStream call chains). This test flaps pretty
> >> >> routinely since upgrading to Samza 1.0; we never noticed this in the
> >> >> past. Sometimes, it times out waiting for any messages, though that's
> >> >> considerably less rare than the ordering issue. My understanding is,
> for
> >> >> a given Task, by default, all processing should be done serially. Is
> >> >> that no longer true? Is the guarantee *only* for the order in which
> >> >> messages are consumed, not produced?
> >> >>
> >> >> For test simplicity, there's a single Kafka partition for each topic
> and
> >> >> I attempted to create a configuration file that would eliminate as
> much
> >> >> coordination and concurrency sources as I knew how:
> >> >>
> >> >>   processor.id=0
> >> >>
> >> >>
> >>
> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
> >> >>   job.container.single.thread.mode=true
> >> >>
> >> >> (We use the ZkJobCoordinatorFactory normally but both produce the
> bug)
> >> >>
> >> >> I realize the KafkaProducer does not *technically* guarantee delivery
> >> >> order except when using transactions, which KafkaSystemProducer
> doesn't
> >> >> appear to do by default. I have checked the actual message envelope
> and
> >> >> when the ordering is wrong, the offset order is correct -- so, "done"
> >> >> was recorded by Kafka prior to "pending". This seems to rule out
> Samza
> >> >> but I'm not entirely confident in that conclusion. Any thoughts?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Tom
> >> >>
> >>
>


Re: Backing Kafka/Yarn/Zookeeper version for Samza 1.0.0?

2019-03-11 Thread Prateek Maheshwari
Hi Jeremiah,

We're in the process of upgrading Samza to use Kafka clients version 2.0,
and YARN client version 2.9. This should be available in the next release
(version 1.2). In the mean time, Kafka 0.11 and YARN 2.6 / 2.7 are the
recommended versions.

Can you clarify what you mean about hard requirements for Zookeeper since
checkpointing?

Thanks,
Prateek

On Fri, Mar 8, 2019 at 11:27 AM Jeremiah Adams 
wrote:

> I am in the process of updating our stack. I’m not seeing documentation on
> what versions of Kafka, Zookeeper and Yarn Samza 1.0.0 should be run on.
> Best I can tell is a 2.6.1 dependency on Hadoop jars and 0.11.0.2 kafka
> jars.  If these jars are meant to match versions of the software,  they are
> dated. Hadoop is up to version 3.2.0 Kafka is up to version 2.1.1
>
> Any advice concerning versions of the components? Also we are planning on
> moving all Samza jobs to standalone and eliminating the Yarn dependencies.
> I prefer taking iterative steps so want to get the updates done before
> moving to containers.
>
> Also, does Samza have any hard requirements for Zookeeper since
> checkpointing was implemented?
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> Blog | Twitter<
> https://twitter.com/HelixEducation> | Facebook<
> https://www.facebook.com/HelixEducation> | LinkedIn<
> http://www.linkedin.com/company/3609946>
>
>


Re: Backing Kafka/Yarn/Zookeeper version for Samza 1.0.0?

2019-03-14 Thread Prateek Maheshwari
Hi Jeremiah,

If you're using YARN, it requires ZK for itself. You'll also need ZK if
you're using Standalone with the ZKJobCoordinator. Other than that the
framework does not rely on ZK for anything.

Hope that helps.

- Prateek

On Tue, Mar 12, 2019 at 7:23 AM Jeremiah Adams 
wrote:

> Is zookeeper required for the tech stack? If samza does all of its
> checkpointing on kafka, I may be able to save us some money by eliminating
> the ZK cluster.
>
>
>
>
>
>
> * Jeremiah Adams*
>
> Software Engineer
>
> www.helixeducation.com
>
> Blog <http://www.helixeducation.com/blog/> | Twitter
> <https://twitter.com/HelixEducation> | Facebook
> <https://www.facebook.com/HelixEducation> | LinkedIn
> <http://www.linkedin.com/company/3609946>
>
>
>
>
>
> *From: *Prateek Maheshwari 
> *Date: *Monday, March 11, 2019 at 6:31 PM
> *To: *"dev@samza.apache.org" , Jeremiah Adams <
> jad...@helixeducation.com>
> *Subject: *Re: Backing Kafka/Yarn/Zookeeper version for Samza 1.0.0?
>
>
>
> Hi Jeremiah,
>
>
>
> We're in the process of upgrading Samza to use Kafka clients version 2.0,
> and YARN client version 2.9. This should be available in the next release
> (version 1.2). In the mean time, Kafka 0.11 and YARN 2.6 / 2.7 are the
> recommended versions.
>
>
>
> Can you clarify what you mean about hard requirements for Zookeeper since
> checkpointing?
>
>
>
> Thanks,
> Prateek
>
>
>
> On Fri, Mar 8, 2019 at 11:27 AM Jeremiah Adams 
> wrote:
>
> I am in the process of updating our stack. I’m not seeing documentation on
> what versions of Kafka, Zookeeper and Yarn Samza 1.0.0 should be run on.
> Best I can tell is a 2.6.1 dependency on Hadoop jars and 0.11.0.2 kafka
> jars.  If these jars are meant to match versions of the software,  they are
> dated. Hadoop is up to version 3.2.0 Kafka is up to version 2.1.1
>
> Any advice concerning versions of the components? Also we are planning on
> moving all Samza jobs to standalone and eliminating the Yarn dependencies.
> I prefer taking iterative steps so want to get the updates done before
> moving to containers.
>
> Also, does Samza have any hard requirements for Zookeeper since
> checkpointing was implemented?
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> <https://url.emailprotection.link/?basKr9vk92a8vVw0XMnK5bnWsxM_w6KChRx8CY_UgrU5RmcwzgGL3Po63B7rJIXeNyMBLKYpptY6Rl-f5kb6p2A~~>
> <http://www.helixeducation.com/
> <https://url.emailprotection.link/?basKr9vk92a8vVw0XMnK5bmaSKuBc0AuEZ7YasYc7Df8YVt3SYmcjmLWdKMWzAAINWlUUA33ebGI7pSoTl9cg1g~~>
> >
> Blog<http://www.helixeducation.com/blog/
> <https://url.emailprotection.link/?basKr9vk92a8vVw0XMnK5bmaSKuBc0AuEZ7YasYc7Df-lAcqG1fqHPpNw-wd9z7HtUJeCG5_8UjCf2mHtn6C_zQ~~>>
> | Twitter<https://twitter.com/HelixEducation
> <https://url.emailprotection.link/?bVO2q0UXR235wN_yOnM0FjqITPdBYMD3reLGNddq-zPV5ChMQK9JwV4Be-QnrbRoXpJl8IcknAqKzYtA3RABKww~~>>
> | Facebook<https://www.facebook.com/HelixEducation
> <https://url.emailprotection.link/?bUU7m4NfMS_EWGtH1yojBHX9sWZ6uxVdT1eQUkmU5vWY01WFZiS2KJ-c9iLIncdHB7Uw1lRYCprEEpPPQCdiK6Q~~>>
> | LinkedIn<http://www.linkedin.com/company/3609946
> <https://url.emailprotection.link/?b0ZQfJ1pZYnASyoShs9MJI46-r1lxPhA-JS5VSkR7so-DFP0_HxbOo2LsajGOaoYXxb1ZCOMAu7hZscPCnIKWpXz0cpgQ386SnNHjPcwsu4z90mzBkuwoZc6YxOCzMGA0>
> >
>
>


Re: Error handling

2019-03-15 Thread Prateek Maheshwari
Hi Tom,

This would depend on what your k8s container orchestration logic looks
like. For example, in YARN, 'status' returns 'not running' after 'start'
until all the containers requested from the AM are 'running'. We also
leverage YARN to restart containers/job automatically on failures (within
some bounds). Additionally, we set up a monitoring alert that goes off if
the number of running containers stays lower than the number of expected
containers for extended periods of time (~ 5 minutes).

Are you saying that you noticed that the LocalApplicationRunner status
returns 'running' even if its stream processor / SamzaContainer has stopped
processing?

- Prateek

On Fri, Mar 15, 2019 at 7:26 AM Tom Davis  wrote:

> I'm using the LocalApplicationRunner and had added a liveness check
> around the `status` method. The app is running in Kubernetes so, in
> theory, it could be restarted if exceptions happened during processing.
> However, it seems that "container failure" is divorced from "app
> failure" because the app continues to run even after all the task
> containers have shut down. Is there a better way to check for
> application health? Is there a way to shut down the application if all
> containers have failed? Should I simply ensure exceptions never escape
> operators? Thanks!
>


Re: [VOTE] Apache Samza 1.1.0 RC2

2019-03-18 Thread Prateek Maheshwari
1. Verified checksum and signatures for the binaries.
2. Ran ./check-all.sh
3. Ran YARN and Standalone integration tests with the config patch
successfully.

+1(binding) from my side as well.

Thanks,
Prateek

On Mon, Mar 18, 2019 at 2:06 PM Jagadish Venkatraman 
wrote:

> 1. Verified check-sum and signatures for the release binaries.
> 2. Ran ./check-all.sh successfully
> 3. Ran YARN integration tests successfully
> 4. Encountered an error on the standalone integration test, but it
> succeeded after setting Kafka's replication factor config to 1.
>
> +1(binding) from my side.
>
> Thanks Daniel Chen and Shanthoosh for shepherding Samza 1.0.1!
>
> On Mon, Mar 18, 2019 at 9:47 AM Jake Maes  wrote:
>
> > Verified with check-all on RHEL 7
> >
> > Verified pgp and sha.
> >
> > +1 (binding)
> >
> > On Fri, Mar 15, 2019 at 11:39 AM rayman preet 
> > wrote:
> >
> > > +1 (Non-binding)
> > >
> > > --
> > > thanks
> > > rayman
> > >
> > > On Wed, Mar 13, 2019 at 7:17 PM Daniel Chen  wrote:
> > >
> > > > Hi,
> > > >
> > > > I performed the following verifications:
> > > >
> > > > 1. ./bin/check-all.sh succeeded.
> > > >
> > > > 2. Verified both ./bin/integration-tests.sh yarn-integration-tests
> and
> > > > ./bin/integration-tests.sh standalone-integration-tests succeeded.
> > > >
> > > > 3. Verified that SQL console available in samza-tool.tgz.
> > > >
> > > > +1 (Non-binding)
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Daniel
> > > >
> > > >
> > > > On Tue, Mar 12, 2019 at 4:11 PM santhosh venkat <
> > > > santhoshvenkat1...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > This is a call for a vote on a release of Apache Samza 1.1.0.
> Thanks
> > to
> > > > > everyone who has contributed to this release.
> > > > >
> > > > > The release candidate can be downloaded from here:
> > > > > http://home.apache.org/~shanthoosh/samza-1.1.0-rc2/
> > > > >
> > > > > The release candidate is signed with pgp key 0xF8B95961A401BF0F,
> > which
> > > > can
> > > > > be found
> > > > >
> > >
> http://keyserver.ubuntu.com/pks/lookup?op=get&search=0xF8B95961A401BF0F
> > > > >
> > > > > The git tag is release-1.1.0-rc0 and signed with the same pgp key:
> > > > >
> > > > >
> > > >
> > >
> >
> https://gitbox.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.1.0-rc2
> > > > >
> > > > > Test binaries have been published to Maven's staging repository,
> and
> > > are
> > > > > available here:
> > > > >
> > >
> https://repository.apache.org/content/repositories/orgapachesamza-1060/
> > > > >
> > > > > The vote will be open for 72 hours (ending at 16:30 PM PST
> Thursday,
> > > > > 03/15/2018).
> > > > >
> > > > > Please download the release candidate, check the hashes/signature,
> > > build
> > > > it
> > > > > and test it, and then please vote:
> > > > >
> > > > > [ ] +1 approve
> > > > >
> > > > > [ ] +0 no opinion
> > > > >
> > > > > [ ] -1 disapprove (and reason why)
> > > > >
> > > > > I ran check-all.sh, integration tests and verified the SQL console
> > > > > in samza-tool tgz.
> > > > >
> > > > > +1 (non-binding) from my side.
> > > > >
> > > > > Thanks,
> > > > >
> > > >
> > >
> > >
> > > --
> > > thanks
> > > rayman
> > >
> >
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>


Re: Error handling

2019-03-22 Thread Prateek Maheshwari
Hi Tom,

This sounds like a bug. ApplicationRunner should return the correct status
when the processor has shut down. We fixed a similar standalone bug
recently, are you already using Samza 1.0.
If this is reproducible / happens again, a thread dump + logs would also be
very helpful for debugging and verifying if the issue is already fixed.

Thanks,
Prateek

On Fri, Mar 22, 2019 at 7:23 AM Tom Davis  wrote:

>
> Prateek Maheshwari  writes:
>
> > Hi Tom,
> >
> > This would depend on what your k8s container orchestration logic looks
> > like. For example, in YARN, 'status' returns 'not running' after 'start'
> > until all the containers requested from the AM are 'running'. We also
> > leverage YARN to restart containers/job automatically on failures (within
> > some bounds). Additionally, we set up a monitoring alert that goes off if
> > the number of running containers stays lower than the number of expected
> > containers for extended periods of time (~ 5 minutes).
> >
> > Are you saying that you noticed that the LocalApplicationRunner status
> > returns 'running' even if its stream processor / SamzaContainer has
> stopped
> > processing?
> >
>
> Yeah, this is what I mean. We have a health check for the overall
> ApplicationStatus but if the containers enter a failed state that
> doesn't result in a shut down of the runner itself. An example from last
> night: Kafka became unavailable at some point and Samza failed to write
> checkpoints for a while, ultimately leading to container failures. The
> last log line is:
>
> o.a.s.c.SamzaContainer - Shutdown is no-op since the container is already
> in
> state: FAILED
>
> This doesn't cause the Pod to be killed, though, so we just silently
> stop processing events. How do you determine the number of expected
> containers? Or are you speaking of containers in terms of YARN and not
> Samza processors?
>
> >
> > - Prateek
> >
> > On Fri, Mar 15, 2019 at 7:26 AM Tom Davis 
> wrote:
> >
> >> I'm using the LocalApplicationRunner and had added a liveness check
> >> around the `status` method. The app is running in Kubernetes so, in
> >> theory, it could be restarted if exceptions happened during processing.
> >> However, it seems that "container failure" is divorced from "app
> >> failure" because the app continues to run even after all the task
> >> containers have shut down. Is there a better way to check for
> >> application health? Is there a way to shut down the application if all
> >> containers have failed? Should I simply ensure exceptions never escape
> >> operators? Thanks!
> >>
>


Fwd: SSL with Samza 0.14.1?

2019-03-25 Thread Prateek Maheshwari
Forwarding again. Original email did not show up on the OSS mailing list.

-- Forwarded message -
From: Deshpande, Omkar 
Date: Fri, Mar 22, 2019 at 5:08 PM
Subject: Fwd: SSL with Samza 0.14.1?
To: prateek...@gmail.com 


++Prateek gmail
--
*From:* LeVeck, Matt
*Sent:* Thursday, March 21, 2019 10:33:11 PM
*To:* dev@samza.apache.org; pmaheshw...@linkedin.com; Deshpande, Omkar;
Audo, Nicholas
*Subject:* SSL with Samza 0.14.1?


Prateek, Samza dev team,

This is Matt from Intuit.  We met briefly at the beginning of this
week’s meetup.  I’m wondering if you could help give us some guidance on
Kafka SSL with Samza.  Here, I’m talking about the Kafka cluster that Samza
uses to store checkpoints, etc.  We’re trying to connect to a cluster that
has SSL enabled, and we’re getting some errors that are indicative of SSL
connectivity failing.  It might just be that our properties file isn’t
correct.  But we’re a wondering if there is another possibility. This
indicates that Samza 0.14.1 uses Kafka 0.11 which should have SSL support.
But Samza 0.14.1 also requires access to zookeeper for its consumer client,
which is indicative of older clients (see
https://samza.apache.org/learn/documentation/0.14/jobs/configuration-table.html#kafka).
Is it possible that Samza 0.14.1 doesn’t support SSL for Kafka when
creating its checkpoint topics?

Anyways, I’m hoping that’s not the case, and either our config is wrong or
we’re doing something else wrong.  Here is our properties snippet in case
we’ve messed up the config key names.  Any guidance is appreciated.


# Kafka System

systems.kafka.zookeeper.connect=sppzookeeper.data-lake-dev.a.intuit.com:2181
,sppzookeeper.data-lake-dev.a.intuit.com:2182,
sppzookeeper.data-lake-dev.a.intuit.com:2183

systems.kafka.security.protocol=SSL

systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

systems.kafka.ssl.truststore.type=JKS

systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks

systems.kafka.ssl.truststore.password=Intuit01

systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701,
sppkafka.data-lake-dev.a.intuit.com:19801,
sppkafka.data-lake-dev.a.intuit.com:19901

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory



We’ve also tried adding producer and consumer specific entries:



systems.kafka.producer.security.protocol=SSL

systems.kafka.producer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

systems.kafka.producer.ssl.truststore.type=JKS

systems.kafka.producer.ssl.truststore.location=/home/appuser/spp/kabini.jks

systems.kafka.producer.ssl.truststore.password=Intuit01

systems.kafka.producer.bootstrap.servers=
sppkafka.data-lake-dev.a.intuit.com:19701,
sppkafka.data-lake-dev.a.intuit.com:19801,
sppkafka.data-lake-dev.a.intuit.com:19901

systems.kafka.consumer.zookeeper.connect=
sppzookeeper.data-lake-dev.a.intuit.com:2181,
sppzookeeper.data-lake-dev.a.intuit.com:2182,
sppzookeeper.data-lake-dev.a.intuit.com:2183

systems.kafka.consumer.security.protocol=SSL

systems.kafka.consumer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

systems.kafka.consumer.ssl.truststore.type=JKS

systems.kafka.consumer.ssl.truststore.location=/home/appuser/spp/kabini.jks

systems.kafka.consumer.ssl.truststore.password=Intuit01

systems.kafka.consumer.bootstrap.servers=
sppkafka.data-lake-dev.a.intuit.com:19701,
sppkafka.data-lake-dev.a.intuit.com:19801,
sppkafka.data-lake-dev.a.intuit.com:19901

systems.kafka.zookeeper.connect=sppzookeeper.data-lake-dev.a.intuit.com:2181
,sppzookeeper.data-lake-dev.a.intuit.com:2182,
sppzookeeper.data-lake-dev.a.intuit.com:2183

systems.kafka.security.protocol=SSL

systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

systems.kafka.ssl.truststore.type=JKS

systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks

systems.kafka.ssl.truststore.password=Intuit01

systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701,
sppkafka.data-lake-dev.a.intuit.com:19801,
sppkafka.data-lake-dev.a.intuit.com:19901

Thanks,

Matt


  1   2   >