>> You are correct that this is focused on the higher-level API but doesn't
preclude using the lower-level API. I was at the same point you were not
long ago, in fact, and had a very productive conversation on the list

Thanks Tom for linking the thread, and I'm glad that you were able to get
Kubernetes integration working with Samza.

>> If it is helpful for everyone, once I get the low-level API +
ZkJobCoordinator + Docker +
K8s working, I'd be glad to formulate an additional sample for hello-samza.

@Thunder Stumpges:
We'd be thrilled to receive your contribution. Examples, demos, tutorials
etc.
contribute a great deal to improving the ease of use of Apache Samza. I'm
happy
to shepherd design discussions/code-reviews in the open-source including
answering
any questions you may have.


>> One thing I'm still curious about, is what are the drawbacks or
complexities of leveraging the Kafka High-level consumer +
PassthroughJobCoordinator in a stand-alone setup like this? We do have
Zookeeper (because of kafka) so I think either would work. The Kafka
High-level consumer comes with other nice tools for monitoring offsets,
lag, etc


@Thunder Stumpges:

Samza uses a "Job-Coordinator" to assign your input-partitions among the
different instances of your application s.t. they don't overlap. A typical
way to solve this "partition distribution"
problem is to have a single instance elected as a "leader" and have the
leader assign partitions to the group.
The ZkJobCoordinator uses Zk primitives to achieve this, while the YarnJC
relies on Yarn's guarantee that there will be a
singleton-AppMaster to achieve this.

A key difference that separates the PassthroughJC from the Yarn/Zk variants
is that it
does _not_ attempt to solve the "partition distribution" problem. As a
result, there's no
leader-election involved. Instead, it pushes the problem of "partition
distribution" to the underlying
consumer.

The PassThroughJc supports these 2 scenarios:

*1. Consumer-managed partition distribution:* When using the Kafka
high-level consumer
(or an AWS KinesisClientLibrary consumer) with Samza, the consumer manages
partitions internally.

*2. Static partition distribution:* Alternately, partitions can be managed
statically using
configuration. You can achieve static partition assignment by implementing
a custom
*SystemStreamPartitionGrouper
<https://samza.apache.org/learn/documentation/0.8/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html>*
 and *TaskNameGrouper
<https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>*.
Solutions in this category will typically
require you to distinguish the various processors in the group by providing
an "id" for each.
Once the "id"s are decided, you can then statically compute assignments
using a function (eg: modulo N).
You can rely on the following mechanisms to provide this id:
 - Configure each instance differently to have its own id
 - Obtain the id from the cluster-manager. For instance, Kubernetes will
provide each POD an
unique id in the range [0,N). AWS ECS should expose similar capabilities
via a REST end-point.

>> One thing I'm still curious about, is what are the drawbacks or
complexities of leveraging the Kafka High-level consumer +
PassthroughJobCoordinator in a stand-alone setup like this?

*Leveraging the Kafka High-level consumer:*

The Kafka high-level consumer is not integrated into Samza just yet.
Instead, Samza's integration with Kafka
uses the low-level consumer because
i) It allows for greater control in fetching data from individual brokers. It
is simple and
performant in-terms of the threading model to have one-thread pull from
each broker.
ii) It is efficient in memory utilization since it does not do
internal-buffering of messages.
iii) There's no overhead like Kafka-controller heart-beats that are driven
by *consumer.poll*

Since there's no built-in integration, you will have to build a new
*SystemConsumer*
if you need to integrate with the Kafka High-level consumer. Further,
there's more a fair
bit of complexity to manage in checkpointing.

>> The Kafka High-level consumer comes with other nice tools for monitoring
offsets, lag, etc

Samza exposes
<https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala>
 the below metrics for lag-monitoring:
- The current log-end offset for each partition
- The last check-pointed offset for each partition
- The number of messages behind the highwatermark of the partition

Please let us know if you need help discovering these or integrating these
with other systems/tools.


*Leveraging the Passthrough JobCoordinator:*

It's helpful to split this discussion on tradeoffs with PassthroughJC into
2 parts:

1. PassthroughJC + consumer managed partitions:

- In this model, Samza has no control over partition-assignment since it's
managed by the consumer. This means that stateful
operations like joins that rely on partitions being co-located on the same
task will not work.
Simple stateless operations (eg: map, filter, remote lookups) are fine.

- A key differentiator between Samza and other frameworks is our support
for "host affinity
<https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html>".
Samza achieves this
by assigning partitions to hosts taking data-locality into account. If the
consumer can arbitrarily shuffle partitions, it'd be hard to
support this affinity/locality. Often this is a key optimization when
dealing with large stateful jobs.

2. PassthroughJC + static partitions:

- In this model, it is possible to make stateful processing (including host
affinity) work by carefully choosing how "id"s are
assigned and computed.

*Recommendation:*

- Owing to the above subtleties, I would recommend that we give the
ZkJobCoordinator + the built-in low-level Kafka integration a try.
- If we hit snags down this path, we can certainly explore the approach
with PassthroughJC + static partitions.
- Using the PassthroughJC + consumer-managed distribution would be least
preferable owing to the subtleties I outlined above.

Please let us know should you have more questions.

Best,
Jagdish

On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <tstump...@ntent.com>
wrote:

> Wow, what great timing, and what a great thread! I definitely have some
> good starters to go off of here.
>
> If it is helpful for everyone, once I get the low-level API +
> ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an
> additional sample for hello-samza.
>
> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this? We do have
> Zookeeper (because of kafka) so I think either would work. The Kafka
> High-level consumer comes with other nice tools for monitoring offsets,
> lag, etc....
>
> Thanks guys!
> -Thunder
>
> -----Original Message-----
> From: Tom Davis [mailto:t...@recursivedream.com]
> Sent: Wednesday, March 14, 2018 17:50
> To: dev@samza.apache.org
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Hey there!
>
> You are correct that this is focused on the higher-level API but doesn't
> preclude using the lower-level API. I was at the same point you were not
> long ago, in fact, and had a very productive conversation on the list:
> you should look for "Question about custom StreamJob/Factory" in the list
> archive for the past couple months.
>
> I'll quote Jagadish Venkatraman from that thread:
>
> > For the section on the low-level API, can you use
> > LocalApplicationRunner#runTask()? It basically creates a new
> > StreamProcessor and runs it. Remember to provide task.class and set it
> > to your implementation of StreamTask or AsyncStreamTask. Please note
> > that this is an evolving API and hence, subject to change.
>
> I ended up just switching to the high-level API because I don't have any
> existing Tasks and the Kubernetes story is a little more straight forward
> there (there's only one container/configuration to deploy).
>
> Best,
>
> Tom
>
> Thunder Stumpges <tstump...@ntent.com> writes:
>
> > Hi all,
> >
> > We are using Samza (0.12.0) in about 2 dozen jobs implementing several
> > processing pipelines. We have also begun a significant move of other
> > services within our company to Docker/Kubernetes. Right now our
> > Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" jobs
> (many reporting and other batch processing jobs). We would really like to
> move our stream processing off of Hadoop/Yarn and onto Kubernetes.
> >
> > When I just read about some of the new progress in .13 and .14 I got
> > really excited! We would love to have our jobs run as simple libraries
> > in our own JVM, and use the Kafka High-Level-Consumer for partition
> distribution and such. This would let us "dockerfy" our application and
> run/scale in kubernetes.
> >
> > However as I read it, this new deployment model is ONLY for the
> > new(er) High Level API, correct? Is there a plan and/or resources for
> > adapting this back to existing low-level tasks ? How complicated of a
> task is that? Do I have any other options to make this transition easier?
> >
> > Thanks in advance.
> > Thunder
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to