Re: Review for SAMZA-1012

2016-10-06 Thread Tommy Becker

Thanks!

On 10/04/2016 11:18 AM, Jacob Maes wrote:

I'm on it!

On Tue, Oct 4, 2016 at 6:22 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



Could I bother a committer for a review of this patch?


https://reviews.apache.org/r/51689/

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Review for SAMZA-1012

2016-10-04 Thread Tommy Becker

Could I bother a committer for a review of this patch?


https://reviews.apache.org/r/51689/

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Review Request 52030: Fix SAMZA-1018

2016-09-19 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52030/
---

Review request for samza.


Bugs: SAMZA-1018
https://issues.apache.org/jira/browse/SAMZA-1018


Repository: samza


Description
---

Check error code from metadata fetch in getSystemStreamPartitionCounts to avoid 
returning no data for newly created topics.


Diffs
-

  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
ba8de5c 

Diff: https://reviews.apache.org/r/52030/diff/


Testing
---

Tested manually


Thanks,

Tommy Becker



Re: Issue with consuming non-existent topics in 0.10.1

2016-09-16 Thread Tommy Becker

Hey Navina,

This was consistently reproducible both locally and in our integration test 
environment. We have auto.create.topics.enable on our brokers (or more 
accurately, we do not have it disabled; it's the default). I did not mean to 
imply there is a problem with the logic of the change in SAMZA-971; I 
understand the desire to make fewer calls, but at the time I did not have time 
to dig in and see exactly what the root cause of the difference was. I think 
I've found it now though.

Prior to the 971 fix, we eventually wind up in 
KafkaSystemAdmin.getTopicsAndPartitionsByBroker(), which contains this code:

KafkaUtil.maybeThrowException(topicMetadata.errorCode)

What I found was that this was indeed throwing a LeaderNotAvailableException in 
the case where the topic did not already exist. This has the effect of 
triggering a retry in KafkaSystemAdmin.getSystemStreamMetadata(), and this 
continues until the broker has finished creating the topic and returns the 
correct partition metadata. The optimized path introduced by the SAMZA-971 fix 
goes into KafkaSystemAdmin.getSystemStreamPartitionCounts() which does not 
check this errorCode, and simply returns an empty set of partitions. Does that 
make sense?


-Tommy






On 09/15/2016 09:54 PM, Navina Ramesh wrote:

Hi Tommy,

Yi and I discussed about it and initially, we thought it could have
something to do with the topic auto-creation setting on your Kafka server.
Is it enabled or disabled in your case?

I kind of suspect that the request timeout is insufficient. However, we do
have retries on Samza to fetch the metadata. So, even if topic does get
auto-created and metadata fetch is delayed, it will try to fetch the
metadata again. Not very clear why SAMZA-971 has anything to do with this.
That JIRA just reduces the number of calls we make to the broker.

Another question, are you able to reproduce this issue ?

Thanks!
Navina

On Wed, Sep 14, 2016 at 1:33 PM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



Thanks for the response, and done.

https://issues.apache.org/jira/browse/SAMZA-1018

On 09/14/2016 01:14 PM, Yi Pan wrote:

Hi, Tommy,

Could you open a JIRA for this one? Also, could you include the Kafka
broker version in this test?

Thanks!

-Yi

On Wed, Sep 14, 2016 at 6:06 AM, Tommy Becker 
<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com> wrote:



We are testing an upgrade to 0.10.1 from 0.9.1 and noticed a regression.
When starting a stream job that consumes a topic that does not yet exist,
the job dies with the following exception:

Exception in thread "main" java.lang.IllegalArgumentException: No tasks
found. Likely due to no input partitions. Can't run a job with no tasks.
 at org.apache.samza.container.grouper.task.GroupByContainerCoun
t.validateTasks(GroupByContainerCount.java:193)
 at org.apache.samza.container.grouper.task.GroupByContainerCoun
t.balance(GroupByContainerCount.java:86)
 at org.apache.samza.coordinator.JobModelManager$.refreshJobMode
l(JobCoordinator.scala:278)
 at org.apache.samza.coordinator.JobModelManager$.jobModelGenera
tor$1(JobCoordinator.scala:211)
 at org.apache.samza.coordinator.JobModelManager$.initializeJobM
odel(JobCoordinator.scala:217)
 at org.apache.samza.coordinator.JobModelManager$.getJobCoordina
tor(JobCoordinator.scala:122)
 at org.apache.samza.coordinator.JobModelManager$.apply(JobCoord
inator.scala:106)
 at org.apache.samza.coordinator.JobModelManager$.apply(JobCoord
inator.scala:112)
 at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJob
Factory.scala:40)
 at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
 at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
 at org.apache.samza.job.JobRunner.main(JobRunner.scala)





The root cause seems to be commit 920f803a2e3dab809f4d7bb518259b0f4164407f
from SAMZA-971. From what I can tell passing partitionsMetadataOnly = true
to the StreamMetadataCache in JobModelManager#getInputStreamPartitions is
what's causing this this behavior. The input topic is still created, but
the proper partition metadata is not returned, resulting in an empty set
being returned. The behavior of Kafka here is screwy, but this still seems
like a regression. The old behavior is nice because it doesn't require that
producer systems come up before the stream processors.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com


<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>






This email and any attach

Re: Issue with consuming non-existent topics in 0.10.1

2016-09-14 Thread Tommy Becker

Thanks for the response, and done.

https://issues.apache.org/jira/browse/SAMZA-1018

On 09/14/2016 01:14 PM, Yi Pan wrote:

Hi, Tommy,

Could you open a JIRA for this one? Also, could you include the Kafka
broker version in this test?

Thanks!

-Yi

On Wed, Sep 14, 2016 at 6:06 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



We are testing an upgrade to 0.10.1 from 0.9.1 and noticed a regression.
When starting a stream job that consumes a topic that does not yet exist,
the job dies with the following exception:

Exception in thread "main" java.lang.IllegalArgumentException: No tasks
found. Likely due to no input partitions. Can't run a job with no tasks.
  at org.apache.samza.container.grouper.task.GroupByContainerCoun
t.validateTasks(GroupByContainerCount.java:193)
  at org.apache.samza.container.grouper.task.GroupByContainerCoun
t.balance(GroupByContainerCount.java:86)
  at org.apache.samza.coordinator.JobModelManager$.refreshJobMode
l(JobCoordinator.scala:278)
  at org.apache.samza.coordinator.JobModelManager$.jobModelGenera
tor$1(JobCoordinator.scala:211)
  at org.apache.samza.coordinator.JobModelManager$.initializeJobM
odel(JobCoordinator.scala:217)
  at org.apache.samza.coordinator.JobModelManager$.getJobCoordina
tor(JobCoordinator.scala:122)
  at org.apache.samza.coordinator.JobModelManager$.apply(JobCoord
inator.scala:106)
  at org.apache.samza.coordinator.JobModelManager$.apply(JobCoord
inator.scala:112)
  at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJob
Factory.scala:40)
  at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
  at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
  at org.apache.samza.job.JobRunner.main(JobRunner.scala)





The root cause seems to be commit 920f803a2e3dab809f4d7bb518259b0f4164407f
from SAMZA-971. From what I can tell passing partitionsMetadataOnly = true
to the StreamMetadataCache in JobModelManager#getInputStreamPartitions is
what's causing this this behavior. The input topic is still created, but
the proper partition metadata is not returned, resulting in an empty set
being returned. The behavior of Kafka here is screwy, but this still seems
like a regression. The old behavior is nice because it doesn't require that
producer systems come up before the stream processors.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: SIGSEGV in RocksDB when killing jobs

2016-09-14 Thread Tommy Becker

Thanks for the reply, Yi.  It does indeed work if we remove calls to 
KeyValueStore.close(). As for why we're doing that, again mostly just for 
housekeeping. I haven't seen any guidance either way on whether StreamTasks 
should close the stores they use, and was unaware this was automatic. We can 
definitely stop doing it, but it would seem prudent to handle this case 
regardless.

On 09/14/2016 01:12 PM, Yi Pan wrote:

Hi, Tommy,

Thanks for reporting this. Definitely we can be more defensive in coding
here. I just wonder what's the specific reason for you to call RocksDB
store close() explicitly? As you see that SamzaContainer#shutdownStores
already calling flush() and close() automatically. Does it work for you if
you remove the explicit store close() calls in your CloseableTask
implementation?

Thanks!

-Yi

On Wed, Sep 14, 2016 at 7:56 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



While testing with Samza 0.10.1 I noticed the following crash whenever I
would kill a job that uses a RocksDB store:


# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7eff66b6c27e, pid=20315, tid=139636974364416
#
# JRE version: Java(TM) SE Runtime Environment (8.0_51-b16) (build
1.8.0_51-b16)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.51-b03 mixed mode
linux-amd64 compressed oops)
# Problematic frame:
# C  [librocksdbjni2253915919401340417..so+0x11427e]
rocksdb_flush_helper(JNIEnv_*, rocksdb::DB*, rocksdb::FlushOptions const&,
rocksdb::ColumnFamilyHandle*)+0x1e
#
# Failed to write core dump. Core dumps have been disabled. To enable core
dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /home/tommy/projects/ffs/ffs-stream-jobs/target/ffs-stream-j
obs-8.1.4.0-SNAPSHOT-dist/ffs-stream-jobs/hs_err_pid20315.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.

I was able to tie this back to the RocksDB store being closed by both our
StreamTask and the SamzaContainer. We always close stores via
CloseableTask#close just for housekeeping purposes. Prior to this issue I
was not aware that this also happens automatically in
SamzaContainer#shutdownStores.  When closed, KeyValueStorageEngine first
calls flush() on the underlying store and there is no guard to ensure that
close has not already been called. The flush() call on a closed DB is what
seems to cause the crash. Obviously RocksDB should handle this more
gracefully, but I wonder if a patch is warranted for Samza also. Thoughts?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


SIGSEGV in RocksDB when killing jobs

2016-09-14 Thread Tommy Becker

While testing with Samza 0.10.1 I noticed the following crash whenever I would 
kill a job that uses a RocksDB store:


# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7eff66b6c27e, pid=20315, tid=139636974364416
#
# JRE version: Java(TM) SE Runtime Environment (8.0_51-b16) (build 1.8.0_51-b16)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.51-b03 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# C  [librocksdbjni2253915919401340417..so+0x11427e]  
rocksdb_flush_helper(JNIEnv_*, rocksdb::DB*, rocksdb::FlushOptions const&, 
rocksdb::ColumnFamilyHandle*)+0x1e
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try 
"ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# 
/home/tommy/projects/ffs/ffs-stream-jobs/target/ffs-stream-jobs-8.1.4.0-SNAPSHOT-dist/ffs-stream-jobs/hs_err_pid20315.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.

I was able to tie this back to the RocksDB store being closed by both our 
StreamTask and the SamzaContainer. We always close stores via 
CloseableTask#close just for housekeeping purposes. Prior to this issue I was 
not aware that this also happens automatically in 
SamzaContainer#shutdownStores.  When closed, KeyValueStorageEngine first calls 
flush() on the underlying store and there is no guard to ensure that close has 
not already been called. The flush() call on a closed DB is what seems to cause 
the crash. Obviously RocksDB should handle this more gracefully, but I wonder 
if a patch is warranted for Samza also. Thoughts?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Issue with consuming non-existent topics in 0.10.1

2016-09-14 Thread Tommy Becker

We are testing an upgrade to 0.10.1 from 0.9.1 and noticed a regression. When 
starting a stream job that consumes a topic that does not yet exist, the job 
dies with the following exception:

Exception in thread "main" java.lang.IllegalArgumentException: No tasks found. 
Likely due to no input partitions. Can't run a job with no tasks.
   at 
org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:193)
   at 
org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:86)
   at 
org.apache.samza.coordinator.JobModelManager$.refreshJobModel(JobCoordinator.scala:278)
   at 
org.apache.samza.coordinator.JobModelManager$.jobModelGenerator$1(JobCoordinator.scala:211)
   at 
org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobCoordinator.scala:217)
   at 
org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobCoordinator.scala:122)
   at 
org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:106)
   at 
org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:112)
   at 
org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40)
   at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
   at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
   at org.apache.samza.job.JobRunner.main(JobRunner.scala)





The root cause seems to be commit 920f803a2e3dab809f4d7bb518259b0f4164407f from 
SAMZA-971. From what I can tell passing partitionsMetadataOnly = true to the 
StreamMetadataCache in JobModelManager#getInputStreamPartitions is what's 
causing this this behavior. The input topic is still created, but the proper 
partition metadata is not returned, resulting in an empty set being returned. 
The behavior of Kafka here is screwy, but this still seems like a regression. 
The old behavior is nice because it doesn't require that producer systems come 
up before the stream processors.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Review Request 51689: Fix SAMZA-1012 - Generated changelog mappings are not consistent

2016-09-13 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51689/
---

(Updated Sept. 13, 2016, 8:26 a.m.)


Review request for samza.


Bugs: SAMZA-1012
https://issues.apache.org/jira/browse/SAMZA-1012


Repository: samza


Description
---

Fix SAMZA-1012. This patch just sorts the results of the SSPGrouper so that 
changelog partitions are assigned in a predicatable order.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
ba38b5c 

Diff: https://reviews.apache.org/r/51689/diff/


Testing
---

Tested manually


Thanks,

Tommy Becker



How do you monitor Samza?

2016-09-12 Thread Tommy Becker

One of the challenges we've faced with deploying Samza is effectively monitoring it. We 
currently use the MetricsSnapshotReporter and consume the resulting stream pulling out 
specific values that we want to monitor, but this approach isn't scalable. We're 
exploring pumping the metrics into an ELK stack, but the format isn't ideal (many keys 
contain dots, which ES doesn't like). Additionally, many of the keys are 
"dynamic" in that the names are determined by runtime configuration, which 
makes pulling specific values out a bit of a pain. I'd love to get some information on 
how other folks are monitoring their Samza jobs.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Question on changelog partition mapping

2016-09-09 Thread Tommy Becker

Done. https://issues.apache.org/jira/browse/SAMZA-1012

On 08/26/2016 06:57 PM, Yi Pan wrote:

Hi, Tommy,

It is perfectly fine. Would you please open a JIRA to include this
improvement?

Thanks!

-Yi

On Fri, Aug 26, 2016 at 6:11 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



Hey Yi,

Apologies for the lateness of my reply. Yeah that makes sense, and we can
certainly implement. Would you consider accepting a PR that makes this
change to the standard groupers? It's just strange that the generated
partition mappings can vary like this, even for identical inputs.

-Tommy


On 08/16/2016 03:04 PM, Yi Pan wrote:

Hi, Tommy,

Yes. Now I understand what you referred to as "non-determinism". The design
of the JobCoordinator has the thought that if "no-previous run is found, we
are free to start from scratch" in mind. I think the current solution that
you can try  is to implement a grouper that will guarantee the order of
groups coming out of the group() method.

Does that make sense?

Thanks!

-Yi

On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker 
<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com> wrote:



Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have
the coordinator stream. But to answer your other questions, the number of
task instances did not change. Specifically, none of the input topic, the
number of partitions in that topic, nor the grouper algorithm changed. The
non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/
main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping
(sic) and the code creates a new mapping by simply assigning sequential
partition numbers from 0 to the number of tasks. But the order in which
these are assigned seems to be determined by the order of the TaskNames in
the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
order). So there is no guarantee that this code will produce the same
changelog mapping each time it runs, even if the number of tasks is the
same. Does that make sense?  The code has changed some since 0.9.1 but
seems to have the same issue even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker 
<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>
 wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.dig

Review Request 51689: Fix SAMZA-1012 - Generated changelog mappings are not consistent

2016-09-07 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51689/
---

Review request for samza.


Repository: samza


Description
---

Fix SAMZA-1012. This patch just sorts the results of the SSPGrouper so that 
changelog partitions are assigned in a predicatable order.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
ba38b5c 

Diff: https://reviews.apache.org/r/51689/diff/


Testing
---

Tested manually


Thanks,

Tommy Becker



Review Request 51613: Fix SAMZA-842

2016-09-02 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51613/
---

Review request for samza.


Bugs: SAMZA-842
https://issues.apache.org/jira/browse/SAMZA-842


Repository: samza


Description
---

Stop JmxServer properly when using ThreadJob.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
56881d4 

Diff: https://reviews.apache.org/r/51613/diff/


Testing
---

Tested manually.


Thanks,

Tommy Becker



Re: Question on changelog partition mapping

2016-08-26 Thread Tommy Becker

Hey Yi,

Apologies for the lateness of my reply. Yeah that makes sense, and we can 
certainly implement. Would you consider accepting a PR that makes this change 
to the standard groupers? It's just strange that the generated partition 
mappings can vary like this, even for identical inputs.

-Tommy


On 08/16/2016 03:04 PM, Yi Pan wrote:

Hi, Tommy,

Yes. Now I understand what you referred to as "non-determinism". The design
of the JobCoordinator has the thought that if "no-previous run is found, we
are free to start from scratch" in mind. I think the current solution that
you can try  is to implement a grouper that will guarantee the order of
groups coming out of the group() method.

Does that make sense?

Thanks!

-Yi

On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have
the coordinator stream. But to answer your other questions, the number of
task instances did not change. Specifically, none of the input topic, the
number of partitions in that topic, nor the grouper algorithm changed. The
non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/
main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping
(sic) and the code creates a new mapping by simply assigning sequential
partition numbers from 0 to the number of tasks. But the order in which
these are assigned seems to be determined by the order of the TaskNames in
the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
order). So there is no guarantee that this code will produce the same
changelog mapping each time it runs, even if the number of tasks is the
same. Does that make sense?  The code has changed some since 0.9.1 but
seems to have the same issue even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker 
<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com> wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com


<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>






This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email a

Re: Question on changelog partition mapping

2016-08-12 Thread Tommy Becker

Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have the 
coordinator stream. But to answer your other questions, the number of task 
instances did not change. Specifically, none of the input topic, the number of 
partitions in that topic, nor the grouper algorithm changed. The 
non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping (sic) 
and the code creates a new mapping by simply assigning sequential partition 
numbers from 0 to the number of tasks. But the order in which these are 
assigned seems to be determined by the order of the TaskNames in the map 
returned by the SystemStreamPartitionGrouper (i.e. no meaningful order). So 
there is no guarantee that this code will produce the same changelog mapping 
each time it runs, even if the number of tasks is the same. Does that make 
sense?  The code has changed some since 0.9.1 but seems to have the same issue 
even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Question on changelog partition mapping

2016-08-11 Thread Tommy Becker

We recently had an issue that caused us to lose the contents of one of our 
Samza job's checkpoint topics. We were not that concerned about losing the 
checkpointed offsets and so we restarted the job. We then started seeing some 
very strange results and were able to trace it back to the fact that changelog 
paritition mapping changed. We were unaware this data was stored in the 
checkpoint topic. Can someone explain why this mapping is necessary? I was 
under the impression that the number of changelog partitions is identical to 
the number of task instances. If this is so, can't partitions just be assigned 
based on the task number? Assuming the mapping is necessary, it would be nice 
if it was deterministic. Looking at JobCoordinator, it seems to be dependent on 
the order in which things come back in the map produced by the 
SystemStreamPartitionGrouper. This non-determinism seems to have been the cause 
of our issues. Obviously data loss is a problem, but it seems like Samza could 
have recreated the original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: 0.10.1 Release

2016-07-27 Thread Tommy Becker

We're going on 3 months here, is a 0.10.1 still planned?

On 05/12/2016 01:53 PM, Yi Pan wrote:

Hi, Andy,

We are doing some pre-release work at this moment. My rough estimation on
0.10.1 timeline would be about 1 month away.

Thanks a lot!

-Yi

On Thu, May 12, 2016 at 9:56 AM, Andy Throgmorton 
<mailto:a...@andythrogmorton.com>
wrote:



Hi,

I'm wondering if anyone has a rough estimate on when 0.10.1 will be
released? Our team recently encountered SAMZA-920 (now fixed in 0.10.1) and
we're trying to determine if we need to apply the bug fix locally as an
interim solution.


Thanks,
Andy






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Long changelog restoration times?

2016-03-10 Thread Tommy Becker

We had one of our Samza jobs restart overnight recently and noticed that 
restoration from the changelog took much longer than I would expect (well over 
an hour). Looking through the logs, the throughput initially seems reasonable 
if not stellar. But nearly every container seems to encounter one or more long 
pauses during the restoration process:

2016-03-09 03:50:09,940 (default) [main] INFO  
[org.apache.samza.storage.kv.KeyValueStorageEngine]  - 5600 entries 
restored...
2016-03-09 03:50:19,895 (default) [main] INFO  
[org.apache.samza.storage.kv.KeyValueStorageEngine]  - 5700 entries 
restored...
2016-03-09 03:51:41,310 (default) [main] INFO  
[org.apache.samza.storage.kv.KeyValueStorageEngine]  - 5800 entries 
restored...
2016-03-09 04:22:13,003 (default) [main] INFO  
[org.apache.samza.storage.kv.KeyValueStorageEngine]  - 5900 entries 
restored...



Here we see a nearly 30 minute span with no logs. So far as we can tell, Kafka is healthy 
during this period and other containers are making progress restoring their partitions 
around this time, so the "gaps" are not happening at the same time across 
containers. We are running Samza 0.9.1 on a YARN cluster in AWS so some variance in 
performance is to be expected, but this seems pretty extreme. Is anyone else seeing this 
behavior?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Passing Java system properties to YARN containers of Samza Job

2015-12-09 Thread Tommy Becker

You should also prefix the names of the API keys with "sensitive.". This will 
mask the values when the config is logged and in the AM UI.

On 12/09/2015 01:19 AM, Yi Pan wrote:

Hi, Gordon,

Try to use --config X.Y.Z=value from the command line to run run-job.sh

On Tue, Dec 8, 2015 at 5:57 PM, Gordon Tai 
<mailto:gor...@vm5.com> wrote:



Hi Rad,

Our Samza jobs run on on-premiere clusters, on top of YARN and Kafka. So
IAM roles won't be an option either.

BR,

On 9 December 2015 at 04:09, Rad Gruchalski 
<mailto:ra...@gruchalski.com> wrote:



Gordon,

If your tasks run on EC2, you should use IAM roles.
https://www.youtube.com/watch?v=XuRM4Id6uDY










Kind regards,
Radek Gruchalski
ra...@gruchalski.com<mailto:ra...@gruchalski.com> (mailto:ra...@gruchalski.com) 
(mailto:
ra...@gruchalski.com<mailto:ra...@gruchalski.com>)
de.linkedin.com/in/radgruchalski/ (
http://de.linkedin.com/in/radgruchalski/)

Confidentiality:
This communication is intended for the above-named person and may be
confidential and/or legally privileged.
If it has come to you in error you must take no action based on it, nor
must you copy or show it to anyone; please delete/destroy and inform the
sender immediately.



On Tuesday, 8 December 2015 at 18:57, Gordon Tai wrote:



Hi,

Is there any convenient way to pass custom Java system properties to


Samza


jobs?
Specifically, since the system properties I wish to pass are
aws.accessKeyId and aws.secretKey for AWS credentials (






http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html


),


writing
them into the Samza job config file doesn't seem like a good idea.
Any help is great, thanks!

BR,
Gordon

--
*Gordon Tai* | Data Engineer | gor...@vm5.com<mailto:gor...@vm5.com> | 
http://www.vm5.com



*VMFIVE CONFIDENTIAL*
This message, including any attachments, may contain information that


is


confidential, proprietary, privileged or otherwise protected by law,


and


is


intended only for the parties involved in the subject discussed


therein.


If


you have received this e-mail by mistake, please immediately delete the
message and notify the sender of such incident. Please be noted that


any


unauthorized use, dissemination, distribution or copying of this email


is


strictly prohibited.











--
*Gordon Tai* | Data Engineer | gor...@vm5.com<mailto:gor...@vm5.com> | 
http://www.vm5.com



*VMFIVE CONFIDENTIAL*
This message, including any attachments, may contain information that is
confidential, proprietary, privileged or otherwise protected by law, and is
intended only for the parties involved in the subject discussed therein. If
you have received this e-mail by mistake, please immediately delete the
message and notify the sender of such incident. Please be noted that any
unauthorized use, dissemination, distribution or copying of this email is
strictly prohibited.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Review Request 40106: SAMZA-812 CachedStore flushes too often

2015-11-11 Thread Tommy Becker


> On Nov. 11, 2015, 2:31 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala, 
> > line 37
> > <https://reviews.apache.org/r/40106/diff/1/?file=1120744#file1120744line37>
> >
> > question: so, here by preserving the old behavior for array keys, the 
> > end result is that the array keys would be immediately flushed out to the 
> > store as they are today, right? Wouldn't it be nicer to fix the 
> > CachedStore's cache hit issue w/ array keys s.t. array keys and other 
> > primitive type of keys behave the same?

I think boxed primitives will work ok as is since they have sane 
equals/hashCode methods; arrays don't. See 
https://issues.apache.org/jira/browse/SAMZA-505 for background on the current 
behavior. In short, we could special case arrays, but they thought it would be 
better to just call out the semantics of the cache more loudly.


- Tommy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/40106/#review106023
-------


On Nov. 9, 2015, 4:30 p.m., Tommy Becker wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/40106/
> ---
> 
> (Updated Nov. 9, 2015, 4:30 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-812
> https://issues.apache.org/jira/browse/SAMZA-812
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Fix for SAMZA-812. Only flush CachedStore when necessary, with the exception 
> that this preserves the buggy flush behavior for array keys. Otherwise the 
> store will not behave properly for array keys due to the mismatch between the 
> reference semantics of the cache vs the value semantics of the store. See the 
> bug for details.
> 
> 
> Diffs
> -
> 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> 1112350 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> cc9c9f3 
> 
> Diff: https://reviews.apache.org/r/40106/diff/
> 
> 
> Testing
> ---
> 
> Unit tested
> 
> 
> Thanks,
> 
> Tommy Becker
> 
>



Review Request 40106: SAMZA-812 CachedStore flushes too often

2015-11-09 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/40106/
---

Review request for samza.


Bugs: SAMZA-812
https://issues.apache.org/jira/browse/SAMZA-812


Repository: samza


Description
---

Fix for SAMZA-812. Only flush CachedStore when necessary, with the exception 
that this preserves the buggy flush behavior for array keys. Otherwise the 
store will not behave properly for array keys due to the mismatch between the 
reference semantics of the cache vs the value semantics of the store. See the 
bug for details.


Diffs
-

  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 1112350 
  samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
cc9c9f3 

Diff: https://reviews.apache.org/r/40106/diff/


Testing
---

Unit tested


Thanks,

Tommy Becker



Re: How to set up RocksDB

2015-11-09 Thread Tommy Becker

The native RocksDB libs are included in the jar file, but they are extracted at 
runtime before they can be loaded. Make sure the user running the job has 
permission to write to...wherever it is that it tries to put it (sorry don't 
know offhand). But you shouldn't have to set any environment variables or 
anything for it to work.

On 11/09/2015 08:50 AM, Lukáš Havrlant wrote:

Hello,
we have a problem with configuring RocksDB as a key-value store with our
Samza 0.9.0 project. We added this line to our config:

stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory

But then our Samza job couldn't locate the
RocksDbKeyValueStorageEngineFactory class. So we added compile
"org.apache.samza:samza-kv-rocksdb_2.10:$samzaVersion" to our gradle build
script. It helped but now Samza failed to start because it couldn't
find librocksdbjni-linux64.so library. We added export
ROCKSDB_SHAREDLIB_DIR=/opt/samza/lib to our init script and it didn't help.
Then we manually copied the librocksdbjni-linux64.so from
the rocksdbjni-3.5.1.jar to the /opt/samza/lib folder and Samza still
couldn't start, now because of

2015-11-09T14:35:06,209 INFO [main]
org.apache.samza.container.SamzaContainer$ - Got store consumers:
Map(tracking-info ->
org.apache.samza.system.kafka.KafkaSystemConsumer@78461bc4)
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x003c4de092ab, pid=4410, tid=139728971949824

And we're out of ideas at the moment. Can you please help me set up RocksDB
correctly? I suppose we are doing something completely wrong.

Is it possibble to use another key-value store at least?

Lukáš Havrlant



--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Streaming question

2015-09-18 Thread Tommy Becker

I'm not familiar with Storm, but Samza doesn't really have any abstractions 
above the job level. Meaning there's nothing I'm aware of that will 
automatically configure your jobs such that they link together in some order 
you specify. You just have to manually configure task.inputs on the downstream 
jobs to read what you're writing from the upstream ones. That is a nice concept 
though!

On 09/18/2015 12:38 AM, Mallikarjuna Rao wrote:

Hi Samaza Gurus,



I want to send output of one task to input of another task for further
processing.  For example In task one I want read data from kafka que and
emit messages with group by key  , in second task I want to read the
messages emitted from first task and further process them. Essentially I
have to link Task one output to  Task two , which should read output from
task one and do further processing. Any pointers for this is highly
appreciated.



This is very similar to spark and storm where output of one task can be feed
as input to another task,  Please kindly provide your inputs if tasks can be
chained or linked in samza. An example from storm is provided below for
better understanding problem



Please see following snippet from storm processing, how stepOne output is
linked to StepTwo.



builder.setBolt("StepOne", new
StepIdentificationRichBolt(),1).setNumTasks(1)


.globalGrouping("KafkaSpout");



builder.setBolt("StepTwo", new
ProcessIdentificationRichBolt_With_TrimSequence(),4).setNumTasks(4)

 .fieldsGrouping("StepOne", new
Fields("UserName"));



Output of stepOne send as input to steptwo in above line of code in storm.








Thanks regards,



Annadath rao




--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


YARN scheduler configuration

2015-09-14 Thread Tommy Becker

We are currently running Samza on a YARN grid. We unintentionally got into a situation 
where we needed more capacity than was available (according to YARN), and we found that 
replacement container requests would just sit there indefinitely waiting to be fulfilled 
rather than failing. Our monitoring was unable to detect that there were jobs that, 
despite being in a "RUNNING" state, were not doing anything because they were 
starved for containers. Is there a way to configure YARN to reject container requests 
that can't be immediately satisfied?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Coordinator URL always 127.0.0.1

2015-07-31 Thread Tommy Becker

I realize the JC runs in the AM YARN container.  The SamzaContainers run in 
their own YARN containers, which may or may not be on the same node as the AM 
in the YARN grid.  My point is simply that when they are not, it does not work. 
 This is because the AM is telling the SamzaContainers to fetch the config from 
127.0.0.1.

-Tommy

On 07/31/2015 02:31 PM, Navina Ramesh wrote:

Hi Tommy,
I am not sure what you mean below:
{quote}
this context by container I meant the SamzaContainer.  What we are seeing
is that jobs only start when YARN happens to place the AM and
SamzaContainer(s) on the same node.Which is increasingly unlikely as you
increase container count for your job and/or expand your YARN grid.
{quote}
Any YARN application finds a yarn container to run the AM. In our case, JC
is running in the same container as the AM. So, I don't understand why this
will cause an issue on cluster expansion. I can understand your concern if
JC and AM are running on 2 separate containers.

Please correct me if I have misunderstood your statement.

Thanks!
Navina

On Fri, Jul 31, 2015 at 6:05 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



Hey Yan,

-- I do not quite understand this part. AM essentially is running in a
container as well. And the http server is brought up in the same container

Sorry, the term "container" is overloaded.  In this context by container I
meant the SamzaContainer.  What we are seeing is that jobs only start when
YARN happens to place the AM and SamzaContainer(s) on the same node.  Which
is increasingly unlikely as you increase container count for your job
and/or expand your YARN grid.

-Tommy

On 07/30/2015 10:08 PM, Yan Fang wrote:

Hi Thommy,

{quote}
Because I don't see how this is ever going to work in scenarios where the
AM is on a different node than the containers.
{quote}

-- I do not quite understand this part. AM essentially is running in a
container as well. And the http server is brought up in the same container.

{quote}
even if we can't get a better address for the AM from YARN, we could at
least filter the addresses we get back from the JVM to exclude loopbacks.
{quote}

-- You are right. InetAddress.getLocalHost() gives back loopback address
sometimes. We should filter this out. Just googling one possible solution
<http://www.coderanch.com/t/491883/java/java/IP><http://www.coderanch.com/t/491883/java/java/IP><
http://www.coderanch.com/t/491883/java/java/IP><http://www.coderanch.com/t/491883/java/java/IP>
 .

+ @Yi, @Navina,

Also, I think this fix should go to the 0.10.0 release.

What do you guys think?

Thanks,

Fang, Yan
yanfang...@gmail.com<mailto:yanfang...@gmail.com><mailto:yanfang...@gmail.com><mailto:yanfang...@gmail.com>

On Thu, Jul 30, 2015 at 6:39 PM, Yan Fang 
<mailto:yanfang...@gmail.com><mailto:yanfang...@gmail.com> wrote:



Just one point to add:

{quote}
AM gets notified of container status from the RM.
{quote}

I think this is not 100% correct. AM can communicate with NM through
NMClientAsync
<
https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/yarn/client/api/async/NMClientAsync.html


<


https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/yarn/client/api/async/NMClientAsync.html>
to
get container status, though Samza does not implement the CallbackHandler.

Thanks,

Fang, Yan
yanfang...@gmail.com<mailto:yanfang...@gmail.com><mailto:yanfang...@gmail.com><mailto:yanfang...@gmail.com>

On Thu, Jul 30, 2015 at 6:06 PM, Navina Ramesh <
nram...@linkedin.com.invalid<mailto:nram...@linkedin.com.invalid><mailto:nram...@linkedin.com.invalid><mailto:nram...@linkedin.com.invalid>>
 wrote:



The NM (and hence, by extension the container) heartbeats to the RM, not
the AM. AM gets notified of container status from the RM.
The AM starts / stops /releases a container process by communicating to
the
NM.

Navina


On Thu, Jul 30, 2015 at 5:55 PM, Thomas Becker 
<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com> wrote:



Ok, I thought there was some communication from the container to the AM,
it sounds like you're saying it's in the other direction only?  Don't
containers heartbeat to the AM?  Regardless, even if we can't get a


better


address for the AM from YARN, we could at least filter the addresses we


get


back from the JVM to exclude loopbacks.

-Tommy

From: Navina Ramesh 
[nram...@linkedin.com.INVALID<mailto:nram...@linkedin.com.INVALID><mailto:nram...@linkedin.com.INVALID>]
Sent: Thursday, July 30, 2015 8:40 PM
To: 
dev@samza.apache.org<mailto:dev@samza.apache.org><mailto:dev@samza.apache.org><mailto:dev@samza.apache.org>
Subject: Re: Coordinator URL always 127.0.0.1

Hi Tommy,
Yi is right. Container start is coordinated by the AppMaster using an
NMClient. Container host name and port is 

Re: Coordinator URL always 127.0.0.1

2015-07-31 Thread Tommy Becker
Re: Coordinator URL always 127.0.0.1

Hi, Tommy,

I think that it might be a commonly asked question regarding to


multiple


IPs on a single host. A common trick w/o changing code is (copied


from


SO:












http://stackoverflow.com/questions/2381316/java-inetaddress-getlocalhost-returns-127-0-0-1-how-to-get-real-ip


)

{code}

  1.

  Find your host name. Type: hostname. For example, you find your


hostname


  is mycomputer.xzy.com
  2.

  Put your host name in your hosts file. /etc/hosts . Such as

  10.50.16.136 mycomputer.xzy.com


{code}

-Yi

On Thu, Jul 30, 2015 at 11:35 AM, Tommy Becker 
<mailto:tobec...@tivo.com>


wrote:





We are testing some jobs on a YARN grid and noticed they are often


not


starting up properly due to being unable to connect to the job


coordinator.


After some investigation it seems as if the jobs are always


getting a


coordinator URL of http://127.0.0.1:  But my understanding


is


that


the coordinator runs only in the AM, so I'd expect these URLs to


more


often


than not be to some other machine.  Looking at the code however,


I'm


not


sure how that would ever happen since the URL for the coordinator


always


comes from InetAddress.getLocalHost().getHostAddress() in
org.apache.samza.coordinator.server.HttpServer#getUrl

Am I off base here?  Because I don't see how this is ever going to


work


in


scenarios where the AM is on a different node than the containers.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and


privileged


material for the sole use of the intended recipient. Any review,


copying,


or distribution of this email (or any attachments) by others is


prohibited.


If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any


attachments. No


employee or agent of TiVo Inc. is authorized to conclude any


binding


agreement on behalf of TiVo Inc. by email. Binding agreements with


TiVo


Inc. may only be made by a signed written agreement.






This email and any attachments may contain confidential and


privileged


material for the sole use of the intended recipient. Any review,


copying,


or distribution of this email (or any attachments) by others is


prohibited.


If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments.


No


employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with


TiVo


Inc. may only be made by a signed written agreement.









--
Navina R.



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review,


copying,


or distribution of this email (or any attachments) by others is


prohibited.


If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Navina R.










--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Coordinator URL always 127.0.0.1

2015-07-30 Thread Tommy Becker

We are testing some jobs on a YARN grid and noticed they are often not starting up 
properly due to being unable to connect to the job coordinator. After some 
investigation it seems as if the jobs are always getting a coordinator URL of 
http://127.0.0.1:  But my understanding is that the coordinator runs only 
in the AM, so I'd expect these URLs to more often than not be to some other machine.  
Looking at the code however, I'm not sure how that would ever happen since the URL 
for the coordinator always comes from InetAddress.getLocalHost().getHostAddress() in 
org.apache.samza.coordinator.server.HttpServer#getUrl

Am I off base here?  Because I don't see how this is ever going to work in 
scenarios where the AM is on a different node than the containers.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Access to high-watermark from within a Samza job

2015-07-23 Thread Tommy Becker

I'm writing a Samza job that basically serves to pump data out of Kafka into 
another system.  For my particular use-case, I want to essentially process the 
entire topic as it exists when the job starts and then exit.  As far as I can 
tell, there doesn't seem to be a way to do that right now because it is 
impossible for the job to determine the high-watermark of the topics it's 
processing.  I found this issue that mentions adding a getHighWatermark() to 
IncomingMessageEnvelope:

https://issues.apache.org/jira/browse/SAMZA-539

The use-case discussed there seems to be metrics but this API would enable mine 
as well.  This seems pretty trivial to add, is there some reason it hasn't been 
done yet?  Otherwise I can take a stab at it.  Or is there another way to do 
what I need that I'm unaware of?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Missing artifact? org.apache.samza:samza-serializers_2.10:jar:0.9.0

2015-06-10 Thread Tommy Becker

I'm sure others will chime in, but I think that jar was eliminated.  The 
serializers seem to be in samza-core now.

On 06/09/2015 08:31 PM, Mart Haitjema wrote:
Hi all,

I was following the instructions (http://samza.apache.org/startup/download/) to 
upgrade to Samza 0.9.0 but ran into a maven build failure - "Failure to find 
org.apache.samza:samza-serializers_2.10:jar:0.9.0”. Looking in the Apache Maven 
repository linked from the page, I don’t see a 0.9.0 under 
https://repository.apache.org/content/groups/public/org/apache/samza/samza-serializers_2.10/.
 Is this missing from the maven repo by chance? Thanks in advance,

-Mart



--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Issues with certain characters in config values

2015-06-09 Thread Tommy Becker

Hi Navina,
Yes, I'm seeing the issue on 0.8.0.  As I mentioned in the bug, I know the 
method of passing the config is changing, but I still wanted to get the issue 
on record.  With regard to Util.envVarEscape(), the assumption seems to be that 
the value is going through the shell at some point, but I don't think that's 
true.  So the extra \s never get removed (theres no envVarUnescape()) resulting 
in invalid JSON.

-Tommy

On 06/09/2015 01:15 PM, Navina Ramesh wrote:

Hi Tommy,

I wouldn't think Util.envVarEscape() is totally unnecessary. Are you
running into this issue with a version <= 0.9 ?
With 0.10, I think we no longer pass all the environment variables through
command line. We only pass the Coordinator system information via command
line.

While Util.envVarEscape() is not exactly smart in the escaping characters,
it is a vital minimum to have.

Thanks!
Navina


On 6/9/15, 5:22 AM, "Tommy Becker" 
<mailto:tobec...@tivo.com> wrote:



Ok, the issue with spaces appears to have been addressed by
https://issues.apache.org/jira/browse/SAMZA-530.  But quotes are still a
problem, and I'm becoming increasingly convinced that Util.envVarEscape()
method is unnecessary.

On 06/08/2015 03:02 PM, Tommy Becker wrote:
We have a stream job that uses an MVEL expression to filter messages to
process.  The expression is passed through the Config, and I noticed that
the expressions were often getting mangled and/or blowing up the job.
I've seen issues with at least 2 characters, quotes and spaces.  In
version 0.8.0 on YARN, the config gets passed through an environment
variable.  Prior to this, the JSON gets run through Util.envVarEscape()
which backslash escapes single and double quotes.  I'm trying to figure
out why this is necessary, because the backslashes never seem to get
removed and then the JSON fails to parse since \' is not a valid JSON
escape sequence.  I also can't seem to get values with spaces in them to
pass though at all (they get truncated at the first space).  I haven't
figured out what's going on there, but wondered if it could be a bug in
the YARN client.  I filed the following bug for this, can anyone shed any
light?

https://issues.apache.org/jira/browse/SAMZA-700

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmit
hs.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is
prohibited. If you are not the intended recipient, please contact the
sender immediately and permanently delete this email and any attachments.
No employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.


--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is
prohibited. If you are not the intended recipient, please contact the
sender immediately and permanently delete this email and any attachments.
No employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.





--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Issues with certain characters in config values

2015-06-09 Thread Tommy Becker

Ok, the issue with spaces appears to have been addressed by 
https://issues.apache.org/jira/browse/SAMZA-530.  But quotes are still a 
problem, and I'm becoming increasingly convinced that Util.envVarEscape() 
method is unnecessary.

On 06/08/2015 03:02 PM, Tommy Becker wrote:
We have a stream job that uses an MVEL expression to filter messages to 
process.  The expression is passed through the Config, and I noticed that the 
expressions were often getting mangled and/or blowing up the job.  I've seen 
issues with at least 2 characters, quotes and spaces.  In version 0.8.0 on 
YARN, the config gets passed through an environment variable.  Prior to this, 
the JSON gets run through Util.envVarEscape() which backslash escapes single 
and double quotes.  I'm trying to figure out why this is necessary, because the 
backslashes never seem to get removed and then the JSON fails to parse since \' 
is not a valid JSON escape sequence.  I also can't seem to get values with 
spaces in them to pass though at all (they get truncated at the first space).  
I haven't figured out what's going on there, but wondered if it could be a bug 
in the YARN client.  I filed the following bug for this, can anyone shed any 
light?

https://issues.apache.org/jira/browse/SAMZA-700

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Issues with certain characters in config values

2015-06-08 Thread Tommy Becker

We have a stream job that uses an MVEL expression to filter messages to 
process.  The expression is passed through the Config, and I noticed that the 
expressions were often getting mangled and/or blowing up the job.  I've seen 
issues with at least 2 characters, quotes and spaces.  In version 0.8.0 on 
YARN, the config gets passed through an environment variable.  Prior to this, 
the JSON gets run through Util.envVarEscape() which backslash escapes single 
and double quotes.  I'm trying to figure out why this is necessary, because the 
backslashes never seem to get removed and then the JSON fails to parse since \' 
is not a valid JSON escape sequence.  I also can't seem to get values with 
spaces in them to pass though at all (they get truncated at the first space).  
I haven't figured out what's going on there, but wondered if it could be a bug 
in the YARN client.  I filed the following bug for this, can anyone shed any 
light?

https://issues.apache.org/jira/browse/SAMZA-700

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Did 0.9.1 ever get released?

2015-06-03 Thread Tommy Becker

Did 0.9.1 ever get released?  I don't see it in Maven Central, nor has the site 
been updated...

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Logback logger

2015-05-14 Thread Tommy Becker

Sorry for not being clear, I'm saying we include logback.xml in the tasks *jar* 
file, not just the Samza job's tarball.

On 05/14/2015 11:19 AM, Davide Simoncelli wrote:

It is included and the root level is set to info. But it prints debug logs as 
well.

Also looking how class path is built in run-class.sh, it doesn’t include the 
lib folder where the logback.xml is stored in.

Davide



On 14 May 2015, at 3:55 pm, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:

We use logback as well and simply include logback.xml in the task's jar file.

On 05/14/2015 08:48 AM, Davide Simoncelli wrote:

Hello,

I’m using logback in a Samza project I’m working on. In the dist.tar.gz package 
slf4j-log4j12 is not included.
But logback requires the configuration file to be in the class path and the 
run-class.sh script only includes .*jar packages from lib folder.
Do you know how I can sort that issue? Should I use a custom run-class.sh 
script?

Thanks

Davide



--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.





--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Logback logger

2015-05-14 Thread Tommy Becker

We use logback as well and simply include logback.xml in the task's jar file.

On 05/14/2015 08:48 AM, Davide Simoncelli wrote:

Hello,

I’m using logback in a Samza project I’m working on. In the dist.tar.gz package 
slf4j-log4j12 is not included.
But logback requires the configuration file to be in the class path and the 
run-class.sh script only includes .*jar packages from lib folder.
Do you know how I can sort that issue? Should I use a custom run-class.sh 
script?

Thanks

Davide



--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Review Request 33674: Fix SAMZA-660 - Default serdes do not work for changelog streams.

2015-04-29 Thread Tommy Becker


> On April 29, 2015, 1:06 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 119
> > <https://reviews.apache.org/r/33674/diff/1/?file=945326#file945326line119>
> >
> > This "config" is not used anymore. remove it?

Oops, I missed that.


- Tommy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33674/#review81978
-------


On April 29, 2015, 2:28 p.m., Tommy Becker wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33674/
> ---
> 
> (Updated April 29, 2015, 2:28 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Fix SAMZA-660 - Default serdes do not work for changelog streams.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 56819e0 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 58d7fe8 
> 
> Diff: https://reviews.apache.org/r/33674/diff/
> 
> 
> Testing
> ---
> 
> Unit tested; had trouble running master due to some issue with the 
> CoordinatorStream commit.
> 
> 
> Thanks,
> 
> Tommy Becker
> 
>



Re: Review Request 33674: Fix SAMZA-660 - Default serdes do not work for changelog streams.

2015-04-29 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33674/
---

(Updated April 29, 2015, 2:28 p.m.)


Review request for samza.


Changes
---

Addressed comment.


Repository: samza


Description
---

Fix SAMZA-660 - Default serdes do not work for changelog streams.


Diffs (updated)
-

  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
56819e0 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
58d7fe8 

Diff: https://reviews.apache.org/r/33674/diff/


Testing
---

Unit tested; had trouble running master due to some issue with the 
CoordinatorStream commit.


Thanks,

Tommy Becker



Review Request 33674: Fix SAMZA-660 - Default serdes do not work for changelog streams.

2015-04-29 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33674/
---

Review request for samza.


Repository: samza


Description
---

Fix SAMZA-660 - Default serdes do not work for changelog streams.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
56819e0 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
58d7fe8 

Diff: https://reviews.apache.org/r/33674/diff/


Testing
---

Unit tested; had trouble running master due to some issue with the 
CoordinatorStream commit.


Thanks,

Tommy Becker



Re: Dealing with partitioning mismatches between bootstrap and input streams

2015-04-08 Thread Tommy Becker

Thanks for the reply Chris.  I got solution 1 more or less implemented while 
getting my bearings.  I then started looking into solution 2 and made some 
progress, but now I'm starting to wonder how well the shared state store fits 
our particular use-case.  As I mentioned, we need to use a bootstrap stream 
rather than a changelog to build the shared store because we need to manipulate 
the data first.  But the shared state store design calls for it to be 
read-only.  So my flow now looks like this:

1) Some task gets assigned the single-partition bootstrap stream and consumes 
it.
2) Since the shared state store is read-only, the "bootstrap task" has to 
instead write data to a changelog topic that backs the shared store.
3) Something is continually reading the changelog and writing to the shared 
store to make updates available to StreamTasks.

Number 3 is where I'm kinda stuck now.  Given the desire to keep the 
SamzaContainer single-threaded, I'm not sure how best to continually consume 
the changelog, short of routing it through the RunLoop like all other streams 
and using a dedicated TaskInstance to consume it. That seems like a big change 
though, all to get to a flow that feels pretty convoluted.  Can you think of 
better way to implement that?


On 04/07/2015 05:32 PM, Chris Riccomini wrote:

Hey Tommy,

Your summary sounds pretty accurate. One other way, which requires no
change to Samza, would be to repartition the input topic properly for each
task. This is kind of hacky, though.

(2) is the ideal solution. It is a bit of work, but it might not be so bad.
I think most of the changes would be isolated to the TaskStorageManager.
We'd also need to make the KV store read-only, which is pretty easy to do.
If you're not comfortable with it, though, then (1) would be your next-best
bet.

Cheers,
Chris

On Tue, Apr 7, 2015 at 10:16 AM, Tommy Becker 
<mailto:tobec...@tivo.com> wrote:



We have a Kafka topic containing data needed by several Samza jobs. These
jobs will essentially read the data and build up state that will be used
for processing their inputs. Ideally, we would use the topic as a bootstrap
stream to build up this state. The problem with that is the topic
containing the data has a single partition but the topics these jobs are
processing as input have multiple partitions. So my understanding is that
only one task instance in the job would actually process the bootstrap
stream, and therefore any state it built up would be local to that task. So
I'm thinking my options are the following:

1) Implement SAMZA-353 and allow the bootstrap SSP to be assigned to each
task instance
2) Implement the shared state store component of SAMZA-402
3) Layer the shared state on top of Samza in our tasks themselves, maybe
by using something like RocksDB directly.

Number 1 seems easiest to implement at the cost of having the entire state
duplicated for each task.  I'd prefer not to do number 3 given the
existence of this feature on Samza's roadmap, but I am a bit concerned
about the scope of work with number 2, and the fact that this is mostly
Scala code.

Are there any alternatives that I'm missing?  Note that we need to process
the data stream as a bootstrap stream.  Using it as a changelog is
insufficient because we need to be able to manipulate the data before
building up the state store.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com><mailto:tobec...@tivo.com><mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Review Request 32877: SAMZA-616

2015-04-07 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32877/
---

(Updated April 7, 2015, 8:19 p.m.)


Review request for samza.


Bugs: SAMZA-616
https://issues.apache.org/jira/browse/SAMZA-616


Repository: samza


Description
---

Fix for SAMZA-616.  Just noticed my IDE cleaned up whitespace in 
configuration-table.html.  Let me know if that's a problem.


Diffs
-

  docs/learn/documentation/versioned/jobs/configuration-table.html e091460 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 1ca9e2c 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4098235 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
5416dd6 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
2a0897f 

Diff: https://reviews.apache.org/r/32877/diff/


Testing (updated)
---

Tested manually


Thanks,

Tommy Becker



Dealing with partitioning mismatches between bootstrap and input streams

2015-04-07 Thread Tommy Becker

We have a Kafka topic containing data needed by several Samza jobs. These jobs 
will essentially read the data and build up state that will be used for 
processing their inputs. Ideally, we would use the topic as a bootstrap stream 
to build up this state. The problem with that is the topic containing the data 
has a single partition but the topics these jobs are processing as input have 
multiple partitions. So my understanding is that only one task instance in the 
job would actually process the bootstrap stream, and therefore any state it 
built up would be local to that task. So I'm thinking my options are the 
following:

1) Implement SAMZA-353 and allow the bootstrap SSP to be assigned to each task 
instance
2) Implement the shared state store component of SAMZA-402
3) Layer the shared state on top of Samza in our tasks themselves, maybe by 
using something like RocksDB directly.

Number 1 seems easiest to implement at the cost of having the entire state 
duplicated for each task.  I'd prefer not to do number 3 given the existence of 
this feature on Samza's roadmap, but I am a bit concerned about the scope of 
work with number 2, and the fact that this is mostly Scala code.

Are there any alternatives that I'm missing?  Note that we need to process the 
data stream as a bootstrap stream.  Using it as a changelog is insufficient 
because we need to be able to manipulate the data before building up the state 
store.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: sending data to a different partitions of the output stream

2015-04-07 Thread Tommy Becker

If you want to send to a specific partition number, you can just pass that 
number as the partition key.  This works because the default partitioner is via 
hashcode, and the hash of integers is the value itself.

On 04/07/2015 07:40 AM, Vladimir Lebedev wrote:
Hey,

I can not find clear explanation of this in the documentation or in 
hello-samza: how to tell collector to send my output data to a particular 
partition of the output stream?

My understanding is that in my process() method I have to create 
OutgoingMessageEnvelope object passing not only my deserialized data, but also 
my partition key, like this:

[...]
try {
 collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"output"), my_partition_key, null, my_data));
   } catch (Exception e) {
 System.err.println("Unable to parse line: " + event);
   }
[...]

The question is: who is responsible for computing the partition number based on 
my_partition_key? How, for example, I can establish some kind of consistent 
hashing mechanism for computing the partition number based on the key? Is it 
configurable somehow via task properties, like I may do it in Kafka via 
partitioner.class property?

Many thanks in advance,

Vladimir

--
Vladimir Lebedev
http://linkedin.com/in/vlebedev


--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Review Request 32878: SAMZA-634

2015-04-06 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32878/
---

Review request for samza.


Bugs: samza-634
https://issues.apache.org/jira/browse/samza-634


Repository: samza


Description
---

Fix for SAMZA-634 - Stop JobCoordinator when ProcessJob exits.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala 7992885 
  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b80d349 
  samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala 
18eedc0 

Diff: https://reviews.apache.org/r/32878/diff/


Testing
---


Thanks,

Tommy Becker



Review Request 32877: SAMZA-616

2015-04-06 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32877/
---

Review request for samza.


Bugs: SAMZA-616
https://issues.apache.org/jira/browse/SAMZA-616


Repository: samza


Description
---

Fix for SAMZA-616.  Just noticed my IDE cleaned up whitespace in 
configuration-table.html.  Let me know if that's a problem.


Diffs
-

  docs/learn/documentation/versioned/jobs/configuration-table.html e091460 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 1ca9e2c 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4098235 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
5416dd6 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
2a0897f 

Diff: https://reviews.apache.org/r/32877/diff/


Testing
---


Thanks,

Tommy Becker



Re: SamzaException: no job factory class defined

2015-03-18 Thread Tommy Becker
he.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask; import
org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;

public class job1 implements StreamTask {
   private final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "beste");

   public void process(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)
   {
  String msg = (String)envelope.getMessage();
  String outmsg = msg;
  collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
   }
}

I have been trying to read the code in the file JobRunner.scala, that
apparently is the one generation the exception and as I understand is
having a problem . I am not really sure if the problem is with the
task.class definition or I still have something missing in the system.

Thanks in advance,

   Jordi

Jordi Blasi Uribarri
Área I+D+i

jbl...@nextel.es<mailto:jbl...@nextel.es>
Oficina Bilbao

[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]



--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Review Request 32202: SAMZA-456 - Add samza-yarn jar to multi-node tutorial.

2015-03-18 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32202/
---

Review request for samza.


Repository: samza


Description
---

Fix for SAMZA-456 - Add samza-yarn jar to multi-node tutorial.


Diffs
-

  docs/learn/tutorials/versioned/run-in-multi-node-yarn.md 969152b 

Diff: https://reviews.apache.org/r/32202/diff/


Testing
---


Thanks,

Tommy Becker



Re: Review Request 32102: SAMZA-589

2015-03-16 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32102/
---

(Updated March 16, 2015, 2:51 p.m.)


Review request for samza.


Changes
---

Address review comments


Repository: samza


Description
---

SAMZA-589


Diffs (updated)
-

  build.gradle 113da2a 
  docs/learn/documentation/versioned/jobs/configuration.md e094f60 
  samza-api/src/main/java/org/apache/samza/config/Config.java 9f7ade0 
  samza-api/src/main/java/org/apache/samza/config/MapConfig.java 1a83923 
  samza-api/src/test/java/org/apache/samza/config/TestConfig.java b4100c2 
  
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 0afee64 
  
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
 734d9d2 

Diff: https://reviews.apache.org/r/32102/diff/


Testing
---

Fix for SAMZA-589 - Need a way to flag sensitive information in Config.

This change adds a sanitize() method to Config that returns a new Config with 
values for keys beginning with "sensitive." masked.  Config.toString() now 
operates on a sanitized copy.  AM Web UI and API endpoints also use sanitized 
values.


Thanks,

Tommy Becker



Review Request 32102: SAMZA-589

2015-03-16 Thread Tommy Becker

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32102/
---

Review request for samza.


Repository: samza


Description
---

SAMZA-589


Diffs
-

  build.gradle 87334fa 
  samza-api/src/main/java/org/apache/samza/config/Config.java 9f7ade0 
  samza-api/src/main/java/org/apache/samza/config/MapConfig.java 1a83923 
  samza-api/src/test/java/org/apache/samza/config/TestConfig.java b4100c2 
  
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 0afee64 
  
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
 734d9d2 

Diff: https://reviews.apache.org/r/32102/diff/


Testing
---

Fix for SAMZA-589 - Need a way to flag sensitive information in Config.

This change adds a sanitize() method to Config that returns a new Config with 
values for keys beginning with "sensitive." masked.  Config.toString() now 
operates on a sanitized copy.  AM Web UI and API endpoints also use sanitized 
values.


Thanks,

Tommy Becker



Storing sensitive data in the Config

2015-03-09 Thread Tommy Becker

We have some sensitive information that we are currently storing in the Samza 
config.  Our ops guys have some concern regarding where the config is displayed 
(e.g. in logs, app master UI, etc).  I'm curious if others have had similar 
concerns and if so what you did about it.  Seems like we might be able to use 
system properties for these things, albeit at a significant cost to 
convenience.  It would be nice if it were possible to mark config values as 
sensitive (perhaps via some sort of naming convention), and have such values be 
retrievable only via explicit get on the key so these sort of incidental 
exposures can't happen.

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobec...@tivo.com<mailto:tobec...@tivo.com>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


RE: [DISCUSS] JDK7

2015-02-18 Thread Tommy Becker
I just added this comment to SAMZA-455, since I was listed as requesting JDK 6 
support:

We have recently migrated to JDK 8, so you may consider my request for JDK 6 
support withdrawn (though it was really more of a clarification of whether JDK 
6 is supported than a request for such support). Personally, I feel that 
requiring JDK 7 is perfectly fine at this stage. Enterprise customers will not 
move until projects start requiring it, so that argument works both ways.


From: Julian Hyde [jul...@hydromatic.net]
Sent: Wednesday, February 18, 2015 1:25 PM
To: dev@samza.apache.org
Subject: Re: [DISCUSS] JDK7

Another data point. Calcite just dropped support for JDK 1.6. Calcite 1.0 
supports 1.6, 1.7, 1.8, but Calcite 1.1 will only support 1.7, 1.8. We could be 
persuaded to reconsider.

Julian

> On Feb 18, 2015, at 09:40, Chris Riccomini  wrote:
>
> Hey all,
>
> Ruslan has been working on upgrading Samza to the latest RocksDB build.
>
>  https://issues.apache.org/jira/browse/SAMZA-442
>
> During this migration, we realized that upgrading Samza to a modern version
> of RocksDB will require JDK 7, since RocksDB requires it to build. Without
> this upgrade, we will not be able to take in TTL support in RocksDB, as
> well as any new features they release.
>
> Hadoop 2.7 (YARN) will also begin a vote in several weeks. This version
> mandates JDK7 be used at compile time:
>
>  https://issues.apache.org/jira/browse/HADOOP-10530
>
> Scala 2.11 requires JDK7+ transitively for us, since Scalatra 2.3 is built
> against JDK 7.
>
>  https://issues.apache.org/jira/browse/SAMZA-469
>
> We also discussed migrating to JDK 7 here:
>
>  https://issues.apache.org/jira/browse/SAMZA-455
>
> And here:
>
>
> http://mail-archives.apache.org/mod_mbox/incubator-samza-dev/201412.mbox/%3cd0ac655a.38469%25criccom...@linkedin.com%3E
>
> Taken in isolation, each one of these is potentially fixable. We could try
> and convince the RocksDB community to move back to JDK 6. We could punt on
> upgrading YARN for a year or more. The same goes for Scala 2.11. This
> doesn't seem ideal, and it's just going to get worse.
>
> I propose that we keep 0.9.0 on JDK6 and require JDK7 for 0.10.0. We can
> target an 0.9.0 release for March, and an 0.10.0 release for June. If we
> move to a more aggressive release schedule (e.g. monthly), then we'll
> require the JDK7 migration at the June release (but no earlier).
>
> Cheers,
> Chris



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


RE: Not safe to access KV stores from InitiableTask.init()

2015-02-16 Thread Tommy Becker
Thanks Chris.

https://issues.apache.org/jira/browse/SAMZA-567

I'm working remotely today and probably tomorrow, so you'll probably beat me to 
it ;)

From: Chris Riccomini [criccom...@apache.org]
Sent: Monday, February 16, 2015 2:39 PM
To: dev@samza.apache.org
Subject: Re: Not safe to access KV stores from InitiableTask.init()

Hey Tommy,

This sounds broken. Let me have a look and see if there's an easy fix. I
*think* reordering should work, but I just want to make sure.

Could you open a JIRA and set the fixed version to 0.9.0? I'll take a looks
today/tomorrow. If you want to test out reordering it, please share any
findings. :)

Cheers,
Chris

On Monday, February 16, 2015, Tommy Becker  wrote:

> I have need to do some initial processing of the entries in my KV store on
> startup before processing messages.  I put the code into my task's init()
> method, and although it worked with an empty KV store/changelog once I have
> entries in there it bombs with a rather obscure exception:
>
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.AbstractMap.default(Map.scala:58)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> ~[scala-library-2.10.1.jar:na]
> at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
> ~[samza-core_2.10-0.8.0.jar:na]
> at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
> ~[samza-core_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> ~[na:1.8.0_25]
> at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
> ~[samza-kv_2.10-0.8.0.jar:na]
> ...
>
> After some investigation I see that it's actually not safe to interact
> with anything that is going to potentially produce messages from init(),
> since startTask is called before startProducers in SamzaContainer.run.  In
> retrospect I guess that is why a MessageCollector is not passed to init()
> but of course writes to the KV store result in messages being sent to the
> changelog :/  I guess my question is whether or not this is intended
> behavior (could we not simply initialize producers before tasks) and if so,
> what an alternative might be for my use case.  As it is currently it seems
> like all I can do is add an "initProcessingDone" flag to my task and check
> it every time a message comes in.
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Not safe to access KV stores from InitiableTask.init()

2015-02-16 Thread Tommy Becker
I have need to do some initial processing of the entries in my KV store on 
startup before processing messages.  I put the code into my task's init() 
method, and although it worked with an empty KV store/changelog once I have 
entries in there it bombs with a rather obscure exception:

java.util.NoSuchElementException: key not found: TaskName-Partition 3
at scala.collection.MapLike$class.default(MapLike.scala:228) 
~[scala-library-2.10.1.jar:na]
at scala.collection.AbstractMap.default(Map.scala:58) 
~[scala-library-2.10.1.jar:na]
at scala.collection.mutable.HashMap.apply(HashMap.scala:64) 
~[scala-library-2.10.1.jar:na]
at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) 
~[samza-core_2.10-0.8.0.jar:na]
at 
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
 ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
 ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
 ~[samza-kv_2.10-0.8.0.jar:na]
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) 
~[na:1.8.0_25]
at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
 ~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
 ~[samza-kv_2.10-0.8.0.jar:na]
...

After some investigation I see that it's actually not safe to interact with 
anything that is going to potentially produce messages from init(), since 
startTask is called before startProducers in SamzaContainer.run.  In retrospect 
I guess that is why a MessageCollector is not passed to init() but of course 
writes to the KV store result in messages being sent to the changelog :/  I 
guess my question is whether or not this is intended behavior (could we not 
simply initialize producers before tasks) and if so, what an alternative might 
be for my use case.  As it is currently it seems like all I can do is add an 
"initProcessingDone" flag to my task and check it every time a message comes in.



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Testing dev@samza.apache.org

2015-01-23 Thread Tommy Becker

Yes.

On 01/23/2015 12:49 PM, Chris Riccomini wrote:

Hey all,

Could you please confirm that you're seeing this? I'm trying to verify the
TLP migration for:

   https://issues.apache.org/jira/browse/INFRA-9055

Cheers,
Chris





This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.