Re: Executing Samza jobs natively in Kubernetes

2015-12-14 Thread Jagadish Venkatraman
Hi Elias,

Thank you so much for your explanation. I'm looking into improving Samza
Standalone and your design-inputs were very useful.

Cheers,
Jagadish

On Mon, Dec 14, 2015 at 12:47 PM, Elias Levy 
wrote:

> On Mon, Dec 14, 2015 at 11:55 AM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> >
> > Thanks for the great work! This is super-helpful. Another cool feature is
> > that this implementation pushes alot of failure handling/restarting to
> the
> > cluster manager.
> >
> > I've some questions on the implementation.
> >
> > 1. It's my understanding that each container is a separate pod. When a
> pod
> > crashes, I assume that the *Kubelet* would bring back the job on the
> > **same**
> > node? (and we can re-use state since the volume is mounted as an
> *emptyDir*
> > volume) This is valuable for statefull jobs.
> >
>
> Correct.  I create an emptyDir state volume on purpose, so that if the
> Docker container dies for some reason, the local state won't go away and
> will be available to the Samza container when it is restarted by Kubelet.
> I do the same thing for the logs, so you can fetch them and debug any
> issues that may be causing the container to fail.
>
>
>
> > 2. Also, the containerId and the jobModel are a part of the env. variable
> > passed to the pod. How do we ensure that - after a restart, the pods that
> > come up (potentially on a different host), have the same containerId?
> (Does
> > Kubernetes help in this by launching the process with the same env.
> > variables as the original run? )
> >
> > This behavior is key to ensure that we process all partitions with no 2
> > containers processing the same partition.
> >
>
> At the moment Kubernetes does not have a specific abstraction to model
> stateful services that have unique identities within the cluster (e.g. a
> Samza job, a Kafka cluster, a ZooKeeper ensemble), but it is easy to work
> past this limitation.  You do so by creating one ReplicationController with
> a replica count of one per group member.  In the case of a Samza job, my
> code creates a RC per SamzaContainer.  Each RC has a slightly different Pod
> spec.  Each Pod is parametrized a unique Samza container ID via the
> SAMZA_CONTAINER_ID environment variable.  So the Kubernetes YAML the code
> outputs ensures there is only one RC per container ID, and the RC ensures
> there is only ever a single container running for its given configuration.
>
> Makes sense?
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


Re: Executing Samza jobs natively in Kubernetes

2015-12-14 Thread Jagadish Venkatraman
Hey Elias,

Thanks for the great work! This is super-helpful. Another cool feature is
that this implementation pushes alot of failure handling/restarting to the
cluster manager.

I've some questions on the implementation.

1. It's my understanding that each container is a separate pod. When a pod
crashes, I assume that the *Kubelet* would bring back the job on the **same**
node? (and we can re-use state since the volume is mounted as an *emptyDir*
volume) This is valuable for statefull jobs.

2. Also, the containerId and the jobModel are a part of the env. variable
passed to the pod. How do we ensure that - after a restart, the pods that
come up (potentially on a different host), have the same containerId? (Does
Kubernetes help in this by launching the process with the same env.
variables as the original run? )

This behavior is key to ensure that we process all partitions with no 2
containers processing the same partition.

Cheers,
Jagadish



On Sun, Nov 29, 2015 at 11:18 AM, Elias Levy 
wrote:

> I've been exploring Samza for stream processing as well as Kubernetes as a
> container orchestration system and I wanted to be able to use one with the
> other.  The prospect of having to execute YARN either along side or on top
> of Kubernetes did not appeal to me, so I developed a KubernetesJob
> implementation of SamzaJob.
>
> You can find the details at https://github.com/eliaslevy/samza_kubernetes,
> but in summary KubernetesJob executes and generates a serialized JobModel.
> Instead of interacting with Kubernetes directly to create the
> SamzaContainers (as the YarnJob's SamzaApplicationMaster may do with the
> YARN RM), it output a config YAML file that can be used to create the
> SamzaContainers in Kubernetes by using Resource Controllers.  For this you
> require to package your job as a Docker image.  You can reach the README at
> the above repo for details.
>
> A few observations:
>
> It would be useful if SamzaContainer accepted the JobModel via an
> environment variable.  Right not it expects a URL to download it from.  I
> get around this by using a entry point script that copies the model from an
> environment variable into a file, then passes a file URL to SamzaContainer.
>
> SamzaContainer doesn't allow you to configure the JMX port.  It selects a
> port at random from the ephemeral range as it expects to execute in YARN
> where a static port could result in a conflict.  This is not the case in
> Kubernetes where each Pod (i.e. SamzaContainer) is given its own IP
> address.
>
> This implementation doesn't provide a Samza dashboard, which in the YARN
> implementation is hosted in the Application Master.  There didn't seem to
> be much value provided by the dashboard that is not already provided by the
> Kubernetes tools for monitoring pods.
>
> I've successfully executed the hello-samza jobs in Kubernetes:
>
> $ kubectl get po
> NAME   READY STATUSRESTARTS   AGE
> kafka-1-jjh8n  1/1   Running   0  2d
> kafka-2-buycp  1/1   Running   0  2d
> kafka-3-tghkp  1/1   Running   0  2d
> wikipedia-feed-0-4its2 1/1   Running   0  1d
> wikipedia-parser-0-l0onv   1/1   Running   0  17h
> wikipedia-parser-1-crrxh   1/1   Running   0  17h
> wikipedia-parser-2-1c5nn   1/1   Running   0  17h
> wikipedia-stats-0-3gaiu1/1   Running   0  16h
> wikipedia-stats-1-j5qlk1/1   Running   0  16h
> wikipedia-stats-2-2laos1/1   Running   0  16h
> zookeeper-1-1sb4a  1/1   Running   0  2d
> zookeeper-2-dndk7  1/1   Running   0  2d
> zookeeper-3-46n09  1/1   Running   0  2d
>
>
> Finally, accessing services within the Kubernetes cluster from the outside
> is quite cumbersome unless one uses an external load balancer.  This makes
> it difficult to bootstrap a job, as SamzaJob must connect to Zookeeper and
> Kafka to find out the number of partitions on the topics it will subscribe
> to, so it can assign them statically among the number of containers
> requested.
>
> Ideally Samza would operate along the lines of the Kafka high-level
> consumer, which dynamically coordinate to allocate work among members of a
> consumer group.  This would do away with the new to execute SamzaJob a
> priori to generate the JobModel to pass to the SamzaContainers.  It would
> also allow for dynamically changing the number of containers without having
> the shutdown the job.
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


Re: Random connection errors

2015-12-14 Thread Yi Pan
Hi, Kishore,

First, I would like to ask which version of Samza you are running? And if
you can attach the log and config of your container (i.e. I assume the log
you attached here is a container log?), it would be greatly helpful.

Thanks a lot!

-Yi

On Mon, Dec 14, 2015 at 5:07 AM, Kishore N C  wrote:

> Hi,
>
> I have a 25 node Samza cluster and I am running a job on a dataset of a
> billion records that is backed by a 7 node Kafka cluster.
>
> Some of the tasks on some of the Samza nodes don't seem to start at all
> (while other tasks run fine on other nodes). The specific error message I
> see is in the task log is:
>
> 2015-12-14 12:50:50 ClientUtils$ [INFO] Fetching metadata from broker
> id:5,host:10.181.18.87,port:9082 with correlation id 0 for 2 topic(s)
> Set(TOPIC_ONE, TOPIC_TWO)
> 2015-12-14 12:50:50 SyncProducer [INFO] Connected to 10.181.18.87:9082 for
> producing
> 2015-12-14 12:50:50 SyncProducer [INFO] Disconnecting from
> 10.181.18.87:9082
> 2015-12-14 12:51:22 SimpleConsumer [INFO] Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
>
> Sometimes, there is a variation like this:
>
> 2015-12-14 13:05:47 ClientUtils$ [WARN] Fetching topic metadata with
> correlation id 0 for topics [Set(TOPIC_ONE, TOPIC_TWO)] from
> broker [id:6,host:10.181.18.193,port:9082] failed
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> at
>
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
> at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:214)
> at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
> at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
> at
>
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:155)
> at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:154)
> at
>
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>
> The above logs just keeps looping and the task never starts processing
> input. I was able to telnet into the host/port combination from the same
> machine. Any idea/pointers to what could be going wrong is greatly
> appreciated.
>
> Thanks,
>
> KN.
>


Review Request 41365: SAMZA-838: negative rocksdb.ttl.ms is not handled correctly

2015-12-14 Thread Tao Feng

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/41365/
---

Review request for samza.


Repository: samza


Description
---

rocskdb ttl is handled per RocksDB 
semantics(https://github.com/facebook/rocksdb/wiki/Time-to-Live) which zero or 
negative ttl is same as infinity ttl.


Diffs
-

  
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 211fc3be1e168f1f92812406785b39b5a3fd9174 

Diff: https://reviews.apache.org/r/41365/diff/


Testing
---

./gradlew clean build &&  ./gradlew checkstyleMain checkstyleTest


Thanks,

Tao Feng



Re: Executing Samza jobs natively in Kubernetes

2015-12-14 Thread Elias Levy
On Mon, Dec 14, 2015 at 11:55 AM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

>
> Thanks for the great work! This is super-helpful. Another cool feature is
> that this implementation pushes alot of failure handling/restarting to the
> cluster manager.
>
> I've some questions on the implementation.
>
> 1. It's my understanding that each container is a separate pod. When a pod
> crashes, I assume that the *Kubelet* would bring back the job on the
> **same**
> node? (and we can re-use state since the volume is mounted as an *emptyDir*
> volume) This is valuable for statefull jobs.
>

Correct.  I create an emptyDir state volume on purpose, so that if the
Docker container dies for some reason, the local state won't go away and
will be available to the Samza container when it is restarted by Kubelet.
I do the same thing for the logs, so you can fetch them and debug any
issues that may be causing the container to fail.



> 2. Also, the containerId and the jobModel are a part of the env. variable
> passed to the pod. How do we ensure that - after a restart, the pods that
> come up (potentially on a different host), have the same containerId? (Does
> Kubernetes help in this by launching the process with the same env.
> variables as the original run? )
>
> This behavior is key to ensure that we process all partitions with no 2
> containers processing the same partition.
>

At the moment Kubernetes does not have a specific abstraction to model
stateful services that have unique identities within the cluster (e.g. a
Samza job, a Kafka cluster, a ZooKeeper ensemble), but it is easy to work
past this limitation.  You do so by creating one ReplicationController with
a replica count of one per group member.  In the case of a Samza job, my
code creates a RC per SamzaContainer.  Each RC has a slightly different Pod
spec.  Each Pod is parametrized a unique Samza container ID via the
SAMZA_CONTAINER_ID environment variable.  So the Kubernetes YAML the code
outputs ensures there is only one RC per container ID, and the RC ensures
there is only ever a single container running for its given configuration.

Makes sense?