Repository: samza Updated Branches: refs/heads/master 45f2558e9 -> affc312d3
Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #778 from vjagadish1989/website-reorg26 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/affc312d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/affc312d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/affc312d Branch: refs/heads/master Commit: affc312d34f459241ac61d4a8a7b51d79b587dee Parents: 45f2558 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Sun Oct 28 11:29:07 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Sun Oct 28 11:29:07 2018 -0700 ---------------------------------------------------------------------- .../documentation/versioned/connectors/kafka.md | 168 +++++++++++-------- .../versioned/connectors/overview.md | 38 ++--- 2 files changed, 112 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/affc312d/docs/learn/documentation/versioned/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md index b6e78e4..447bfdc 100644 --- a/docs/learn/documentation/versioned/connectors/kafka.md +++ b/docs/learn/documentation/versioned/connectors/kafka.md @@ -19,108 +19,132 @@ title: Kafka Connector limitations under the License. --> -## Overview -Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process it and emit results to other Kafka topics or external systems. +### Kafka I/O : QuickStart +Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases. -## Consuming from Kafka +The `hello-samza` project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results. -### <a name="kafka-basic-configuration"></a>Basic Configuration +- [High-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn) -The example below provides a basic example for configuring a system called `kafka-cluster-1` that uses the provided KafkaSystemFactory. +- [Low-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza) -{% highlight jproperties %} -# Set the SystemFactory implementation to instantiate KafkaSystemConsumer, KafkaSystemProducer and KafkaSystemAdmin -systems.kafka-cluster-1.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -# Define the default key and message SerDe. -systems.kafka-cluster-1.default.stream.samza.key.serde=string -systems.kafka-cluster-1.default.stream.samza.msg.serde=json +### Concepts -# Zookeeper nodes of the Kafka cluster -systems.kafka-cluster-1.consumer.zookeeper.connect=localhost:2181 +####KafkaSystemDescriptor -# List of network endpoints where Kafka brokers are running. Also needed by consumers for querying metadata. -systems.kafka-cluster-1.producer.bootstrap.servers=localhost:9092,localhost:9093 +Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `KafkaSystemDescriptor` allows you to describe the Kafka cluster you are interacting with and specify its properties. + +{% highlight java %} + KafkaSystemDescriptor kafkaSystemDescriptor = + new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); {% endhighlight %} -Samza provides a built-in KafkaSystemDescriptor to consume from and produce to Kafka from the StreamApplication (High-level API) or the TaskApplication (Low-level API). -Below is an example of how to use the descriptors in the describe method of a StreamApplication. +####KafkaInputDescriptor +A Kafka cluster usually has multiple topics (a.k.a _streams_). The `KafkaInputDescriptor` allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of `KafkaInputDescriptor` +by providing a topic-name and a serializer. {% highlight java %} -public class PageViewFilter implements StreamApplication { - @Override - public void describe(StreamApplicationDescriptor appDesc) { - // add input and output streams - KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1"); - KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor("myinput", new JsonSerdeV2<>(PageView.class)); - KafkaOutputDescriptor<DecoratedPageView> osd = ksd.getOutputDescriptor("myout", new JsonSerdeV2<>(DecordatedPageView.class)); - - MessageStream<PageView> ms = appDesc.getInputStream(isd); - OutputStream<DecoratedPageView> os = appDesc.getOutputStream(osd); - - ms.filter(this::isValidPageView) - .map(this::addProfileInformation) - .sendTo(os); - } -} + KafkaInputDescriptor<PageView> pageViewStreamDescriptor = kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class)); {% endhighlight %} -Below is an example of how to use the descriptors in the describe method of a TaskApplication +The above example describes an input Kafka stream from the "page-view-topic" which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc. + +####KafkaOutputDescriptor + +Similarly, the `KafkaOutputDescriptor` allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of `KafkaOutputDescriptor`. {% highlight java %} -public class PageViewFilterTask implements TaskApplication { - @Override - public void describe(TaskApplicationDescriptor appDesc) { - // add input and output streams - KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1"); - KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde()); - KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde()); - - appDesc.addInputStream(isd); - appDesc.addOutputStream(osd); - appDesc.addTable(td); - - appDesc.withTaskFactory((StreamTaskFactory) () -> new MyStreamTask()); - } -} + KafkaOutputDescriptor<DecoratedPageView> decoratedPageView = kafkaSystemDescriptor.getOutputDescriptor("my-output-topic", new JsonSerdeV2<>(DecoratedPageView.class)); {% endhighlight %} -### Advanced Configuration -Prefix the configuration with `systems.system-name.consumer.` followed by any of the Kafka consumer configurations. See [Kafka Consumer Configuration Documentation](http://kafka.apache.org/documentation.html#consumerconfigs) +### Configuration -{% highlight jproperties %} -systems.kafka-cluster-1.consumer.security.protocol=SSL -systems.kafka-cluster-1.consumer.max.partition.fetch.bytes=524288 +#####Configuring Kafka producer and consumer + +The `KafkaSystemDescriptor` allows you to specify any [Kafka producer](https://kafka.apache.org/documentation/#producerconfigs) or [Kafka consumer](https://kafka.apache.org/documentation/#consumerconfigs)) property which are directly passed over to the underlying Kafka client. This allows for +precise control over the KafkaProducer and KafkaConsumer used by Samza. + +{% highlight java %} + KafkaSystemDescriptor kafkaSystemDescriptor = + new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..) + .withProducerBootstrapServers(..) + .withConsumerConfigs(..) + .withProducerConfigs(..) {% endhighlight %} -## Producing to Kafka -### Basic Configuration +####Accessing an offset which is out-of-range +This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers. -The basic configuration is the same as [Consuming from Kafka](#kafka-basic-configuration). +{% highlight java %} + KafkaSystemDescriptor kafkaSystemDescriptor = + new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..) + .withProducerBootstrapServers(..) + .withConsumerAutoOffsetReset("largest") +{% endhighlight %} -### Advanced Configuration -#### Changelog to Kafka for State Stores +#####Ignoring checkpointed offsets +Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with `KafkaInputDescriptor#shouldResetOffset()`. +Once there are no checkpoints for a stream, the `#withOffsetDefault(..)` determines whether we start consumption from the oldest or newest offset. -For Samza processors that have local state and is configured with a changelog for durability, if the changelog is configured to use Kafka, there are Kafka specific configuration parameters. -See section on `TODO: link to state management section` State Management `\TODO` for more details. +{% highlight java %} +KafkaInputDescriptor<PageView> pageViewStreamDescriptor = + kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class)) + .shouldResetOffset() + .withOffsetDefault(OffsetType.OLDEST); -{% highlight jproperties %} -stores.store-name.changelog=kafka-cluster-2.changelog-topic-name -stores.store-name.changelog.replication.factor=3 -stores.store-name.changelog.kafka.cleanup.policy=compact {% endhighlight %} -#### Performance Tuning +The above example configures Samza to ignore checkpointed offsets for `page-view-topic` and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using `KafkaSystemDescriptor#withDefaultStreamOffsetDefault`. + + -Increasing the consumer fetch buffer thresholds may improve throughput at the expense of memory by buffering more messages. Run some performance analysis to find the optimal values. +### Code walkthrough + +In this section, we walk through a complete example. + +#### High-level API +{% highlight java %} +// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor +1 KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") +2 .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) +3 .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + +// Create an KafkaInputDescriptor for your input topic and a KafkaOutputDescriptor for the output topic +4 KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)); +5 KafkaInputDescriptor<KV<String, PageView>> inputDescriptor = +6 kafkaSystemDescriptor.getInputDescriptor("page-views", serde); +7 KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor = +8 kafkaSystemDescriptor.getOutputDescriptor("filtered-page-views", serde); + + +// Obtain a message stream the input topic +9 MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor); + +// Obtain an output stream for the topic +10 OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor); + +// write results to the output topic +11 pageViews +12 .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId)) +13 .sendTo(filteredPageViews); -{% highlight jproperties %} -# Max number of messages to buffer across all Kafka input topic partitions per container. Default is 50000 messages. -systems.kafka-cluster-1.samza.fetch.threshold=10000 -# Max buffer size by bytes. This configuration takes precedence over the above configuration if value is not -1. Default is -1. -systems.kafka-cluster-1.samza.fetch.threshold.bytes=-1 {% endhighlight %} + +- Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster + +- Lines 4-6 defines a KafkaInputDescriptor for our input topic - `page-views` + +- Lines 7-9 defines a KafkaOutputDescriptor for our output topic - `filtered-page-views` + +- Line 9 creates a MessageStream for the input topic so that you can chain operations on it later + +- Line 10 creates an OuputStream for the output topic + +- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/affc312d/docs/learn/documentation/versioned/connectors/overview.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/overview.md b/docs/learn/documentation/versioned/connectors/overview.md index 5b6ba39..6697b9c 100644 --- a/docs/learn/documentation/versioned/connectors/overview.md +++ b/docs/learn/documentation/versioned/connectors/overview.md @@ -20,33 +20,27 @@ title: Connectors overview --> Stream processing applications often read data from external sources like Kafka or HDFS. Likewise, they require processed -results to be written to external system or data stores. As of the 1.0 release, Samza integrates with the following systems -out-of-the-box: +results to be written to external system or data stores. Samza is pluggable and designed to support a variety of [producers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemProducer.html) and [consumers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemConsumer.html) for your data. You can +integrate Samza with any streaming system by implementing the [SystemFactory](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemFactory.html) interface. -- [Apache Kafka](kafka) (consumer/producer) -- [Microsoft Azure Eventhubs](eventhubs) (consumer/producer) -- [Amazon AWS Kinesis Streams](kinesis) (consumer) -- [Hadoop Filesystem](hdfs) (consumer/producer) -- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java) (producer) +The following integrations are supported out-of-the-box: -Instructions on how to use these connectors can be found in the corresponding subsections. Please note that the -connector API is different from [Samza Table API](../api/table-api), where the data could be read from and written to -data stores. +Consumers: -Samza is pluggable and designed to support a variety of producers and consumers. You can provide your own producer or -consumer by implementing the SystemFactory interface. +- [Apache Kafka](kafka) -To associate a system with a Samza Connector, the user needs to set the following config: +- [Microsoft Azure Eventhubs](eventhubs) -{% highlight jproperties %} -systems.<system-name>.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -{% endhighlight %} +- [Amazon AWS Kinesis Streams](kinesis) -Any system specific configs, could be defined as below: +- [Hadoop Filesystem](hdfs) -{% highlight jproperties %} -systems.<system-name>.param1=value1 -systems.<system-name>.consumer.param2=value2 -systems.<system-name>.producer.param3=value3 -{% endhighlight %} +Producers: +- [Apache Kafka](kafka) + +- [Microsoft Azure Eventhubs](eventhubs) + +- [Hadoop Filesystem](hdfs) + +- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java)