http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/references/metrics.rst ---------------------------------------------------------------------- diff --git a/docs/references/metrics.rst b/docs/references/metrics.rst deleted file mode 100644 index fab28b0..0000000 --- a/docs/references/metrics.rst +++ /dev/null @@ -1,480 +0,0 @@ -Metrics -======= - -This section lists the metrics exposed by main classes. - -({scope} is referencing current scope value of passed in StatsLogger.) - -MonitoredFuturePool -------------------- - -**{scope}/tasks_pending** - -Gauge. How many tasks are pending in this future pool? If this value becomes high, it means that -the future pool execution rate couldn't keep up with submission rate. That would be cause high -*task_pending_time* hence affecting the callers that use this future pool. -It could also cause heavy jvm gc if this pool keeps building up. - -**{scope}/task_pending_time** - -OpStats. It measures the characteristics about the time that tasks spent on waiting being executed. -It becomes high because either *tasks_pending* is building up or *task_execution_time* is high blocking other -tasks to execute. - -**{scope}/task_execution_time** - -OpStats. It measures the characteristics about the time that tasks spent on execution. If it becomes high, -it would block other tasks to execute if there isn't enough threads in this executor, hence cause high -*task_pending_time* and impact user end latency. - -**{scope}/task_enqueue_time** - -OpStats. The time that tasks spent on submission. The submission time would also impact user end latency. - -MonitoredScheduledThreadPoolExecutor ------------------------------------- - -**{scope}/pending_tasks** - -Gauge. How many tasks are pending in this thread pool executor? If this value becomes high, it means that -the thread pool executor execution rate couldn't keep up with submission rate. That would be cause high -*task_pending_time* hence affecting the callers that use this executor. It could also cause heavy jvm gc if -queue keeps building up. - -**{scope}/completed_tasks** - -Gauge. How many tasks are completed in this thread pool executor? - -**{scope}/total_tasks** - -Gauge. How many tasks are submitted to this thread pool executor? - -**{scope}/task_pending_time** - -OpStats. It measures the characteristics about the time that tasks spent on waiting being executed. -It becomes high because either *pending_tasks* is building up or *task_execution_time* is high blocking other -tasks to execute. - -**{scope}/task_execution_time** - -OpStats. It measures the characteristics about the time that tasks spent on execution. If it becomes high, -it would block other tasks to execute if there isn't enough threads in this executor, hence cause high -*task_pending_time* and impact user end latency. - -OrderedScheduler ----------------- - -OrderedScheduler is a thread pool based *ScheduledExecutorService*. It is comprised with multiple -MonitoredScheduledThreadPoolExecutor_. Each MonitoredScheduledThreadPoolExecutor_ is wrapped into a -MonitoredFuturePool_. So there are aggregated stats and per-executor stats exposed. - -Aggregated Stats -~~~~~~~~~~~~~~~~ - -**{scope}/task_pending_time** - -OpStats. It measures the characteristics about the time that tasks spent on waiting being executed. -It becomes high because either *pending_tasks* is building up or *task_execution_time* is high blocking other -tasks to execute. - -**{scope}/task_execution_time** - -OpStats. It measures the characteristics about the time that tasks spent on execution. If it becomes high, -it would block other tasks to execute if there isn't enough threads in this executor, hence cause high -*task_pending_time* and impact user end latency. - -**{scope}/futurepool/tasks_pending** - -Gauge. How many tasks are pending in this future pool? If this value becomes high, it means that -the future pool execution rate couldn't keep up with submission rate. That would be cause high -*task_pending_time* hence affecting the callers that use this future pool. -It could also cause heavy jvm gc if this pool keeps building up. - -**{scope}/futurepool/task_pending_time** - -OpStats. It measures the characteristics about the time that tasks spent on waiting being executed. -It becomes high because either *tasks_pending* is building up or *task_execution_time* is high blocking other -tasks to execute. - -**{scope}/futurepool/task_execution_time** - -OpStats. It measures the characteristics about the time that tasks spent on execution. If it becomes high, -it would block other tasks to execute if there isn't enough threads in this executor, hence cause high -*task_pending_time* and impact user end latency. - -**{scope}/futurepool/task_enqueue_time** - -OpStats. The time that tasks spent on submission. The submission time would also impact user end latency. - -Per Executor Stats -~~~~~~~~~~~~~~~~~~ - -Stats about individual executors are exposed under *{scope}/{name}-executor-{id}-0*. *{name}* is the scheduler -name and *{id}* is the index of the executor in the pool. The corresponding stats of its futurepool are exposed -under *{scope}/{name}-executor-{id}-0/futurepool*. See MonitoredScheduledThreadPoolExecutor_ and MonitoredFuturePool_ -for more details. - -ZooKeeperClient ---------------- - -Operation Stats -~~~~~~~~~~~~~~~ - -All operation stats are exposed under {scope}/zk. The stats are **latency** *OpStats* -on zookeeper operations. - -**{scope}/zk/{op}** - -latency stats on operations. -these operations are *create_client*, *get_data*, *set_data*, *delete*, *get_children*, *multi*, *get_acl*, *set_acl* and *sync*. - -Watched Event Stats -~~~~~~~~~~~~~~~~~~~ - -All stats on zookeeper watched events are exposed under {scope}/watcher. The stats are *Counter* -about the watched events that this client received: - -**{scope}/watcher/state/{keeper_state}** - -the number of `KeeperState` changes that this client received. The states are *Disconnected*, *SyncConnected*, -*AuthFailed*, *ConnectedReadOnly*, *SaslAuthenticated* and *Expired*. By monitoring metrics like *SyncConnected* -or *Expired* it would help understanding the healthy of this zookeeper client. - -**{scope}/watcher/events/{event}** - -the number of `Watcher.Event`s received by this client. Those events are *None*, *NodeCreated*, *NodeDeleted*, -*NodeDataChanged*, *NodeChildrenChanged*. - -Watcher Manager Stats -~~~~~~~~~~~~~~~~~~~~~ - -This ZooKeeperClient provides a watcher manager to manage watchers for applications. It tracks the mapping between -paths and watcher. It is the way to provide the ability on removing watchers. The stats are *Gauge* about the number -of watchers managed by this zookeeper client. - -**{scope}/watcher_manager/total_watches** - -total number of watches that are managed by this watcher manager. If it keeps growing, it usually means that -watchers are leaking (resources aren't closed properly). It will cause OOM. - -**{scope}/watcher_manager/num_child_watches** - -total number of paths that are watched by this watcher manager. - -BookKeeperClient ----------------- - -TODO: add bookkeeper stats there - -DistributedReentrantLock ------------------------- - -All stats related to locks are exposed under {scope}/lock. - -**{scope}/acquire** - -OpStats. It measures the characteristics about the time that spent on acquiring locks. - -**{scope}/release** - -OpStats. It measures the characteristics about the time that spent on releasing locks. - -**{scope}/reacquire** - -OpStats. The lock will be expired when the underneath zookeeper session expired. The -reentrant lock will attempt to re-acquire the lock automatically when session expired. -This metric measures the characteristics about the time that spent on re-acquiring locks. - -**{scope}/internalTryRetries** - -Counter. The number of retries that locks spend on re-creating internal locks. Typically, -a new internal lock will be created when session expired. - -**{scope}/acquireTimeouts** - -Counter. The number of timeouts that caller experienced when acquiring locks. - -**{scope}/tryAcquire** - -OpStats. It measures the characteristics about the time that each internal lock spent on -acquiring. - -**{scope}/tryTimeouts** - -Counter. The number of timeouts that internal locks try acquiring. - -**{scope}/unlock** - -OpStats. It measures the characteristics about the time that the caller spent on unlocking -internal locks. - -BKLogHandler ------------- - -The log handler is a base class on managing log segments. so all the metrics in this class are -related log segments retrieval and exposed under {scope}/logsegments. They are all `OpStats` in -the format of `{scope}/logsegments/{op}`. Those operations are: - -* force_get_list: force to get the list of log segments. -* get_list: get the list of the log segments. it might just retrieve from local log segment cache. -* get_filtered_list: get the filtered list of log segments. -* get_full_list: get the full list of log segments. -* get_inprogress_segment: time between the inprogress log segment created and the handler read it. -* get_completed_segment: time between a log segment is turned to completed and the handler read it. -* negative_get_inprogress_segment: record the negative values for `get_inprogress_segment`. -* negative_get_completed_segment: record the negative values for `get_completed_segment`. -* recover_last_entry: recovering last entry from a log segment. -* recover_scanned_entries: the number of entries that are scanned during recovering. - -See BKLogWriteHandler_ for write handlers. - -See BKLogReadHandler_ for read handlers. - -BKLogReadHandler ----------------- - -The core logic in log reader handle is readahead worker. Most of readahead stats are exposed under -{scope}/readahead_worker. - -**{scope}/readahead_worker/wait** - -Counter. Number of waits that readahead worker is waiting. If this keeps increasing, it usually means -readahead keep getting full because of reader slows down reading. - -**{scope}/readahead_worker/repositions** - -Counter. Number of repositions that readhead worker encounters. Reposition means that a readahead worker -finds that it isn't advancing to a new log segment and force re-positioning. - -**{scope}/readahead_worker/entry_piggy_back_hits** - -Counter. It increases when the last add confirmed being advanced because of the piggy-back lac. - -**{scope}/readahead_worker/entry_piggy_back_misses** - -Counter. It increases when the last add confirmed isn't advanced by a read entry because it doesn't -iggy back a newer lac. - -**{scope}/readahead_worker/read_entries** - -OpStats. Stats on number of entries read per readahead read batch. - -**{scope}/readahead_worker/read_lac_counter** - -Counter. Stats on the number of readLastConfirmed operations - -**{scope}/readahead_worker/read_lac_and_entry_counter** - -Counter. Stats on the number of readLastConfirmedAndEntry operations. - -**{scope}/readahead_worker/cache_full** - -Counter. It increases each time readahead worker finds cache become full. If it keeps increasing, -that means reader slows down reading. - -**{scope}/readahead_worker/resume** - -OpStats. Stats on readahead worker resuming reading from wait state. - -**{scope}/readahead_worker/long_poll_interruption** - -OpStats. Stats on the number of interruptions happened to long poll. the interruptions are usually -because of receiving zookeeper notifications. - -**{scope}/readahead_worker/notification_execution** - -OpStats. Stats on executions over the notifications received from zookeeper. - -**{scope}/readahead_worker/metadata_reinitialization** - -OpStats. Stats on metadata reinitialization after receiving notifcation from log segments updates. - -**{scope}/readahead_worker/idle_reader_warn** - -Counter. It increases each time the readahead worker detects itself becoming idle. - -BKLogWriteHandler ------------------ - -Log write handlers are responsible for log segment creation/deletions. All the metrics are exposed under -{scope}/segments. - -**{scope}/segments/open** - -OpStats. Latency characteristics on starting a new log segment. - -**{scope}/segments/close** - -OpStats. Latency characteristics on completing an inprogress log segment. - -**{scope}/segments/recover** - -OpStats. Latency characteristics on recovering a log segment. - -**{scope}/segments/delete** - -OpStats. Latency characteristics on deleting a log segment. - -BKAsyncLogWriter ----------------- - -**{scope}/log_writer/write** - -OpStats. latency characteristics about the time that write operations spent. - -**{scope}/log_writer/write/queued** - -OpStats. latency characteristics about the time that write operations spent in the queue. -`{scope}/log_writer/write` latency is high might because the write operations are pending -in the queue for long time due to log segment rolling. - -**{scope}/log_writer/bulk_write** - -OpStats. latency characteristics about the time that bulk_write operations spent. - -**{scope}/log_writer/bulk_write/queued** - -OpStats. latency characteristics about the time that bulk_write operations spent in the queue. -`{scope}/log_writer/bulk_write` latency is high might because the write operations are pending -in the queue for long time due to log segment rolling. - -**{scope}/log_writer/get_writer** - -OpStats. the time spent on getting the writer. it could spike when there is log segment rolling -happened during getting the writer. it is a good stat to look into when the latency is caused by -queuing time. - -**{scope}/log_writer/pending_request_dispatch** - -Counter. the number of queued operations that are dispatched after log segment is rolled. it is -an metric on measuring how many operations has been queued because of log segment rolling. - -BKAsyncLogReader ----------------- - -**{scope}/async_reader/future_set** - -OpStats. Time spent on satisfying futures of read requests. if it is high, it means that the caller -takes time on processing the result of read requests. The side effect is blocking consequent reads. - -**{scope}/async_reader/schedule** - -OpStats. Time spent on scheduling next reads. - -**{scope}/async_reader/background_read** - -OpStats. Time spent on background reads. - -**{scope}/async_reader/read_next_exec** - -OpStats. Time spent on executing `reader#readNext()` - -**{scope}/async_reader/time_between_read_next** - -OpStats. Time spent on between two consequent `reader#readNext()`. if it is high, it means that -the caller is slowing down on calling `reader#readNext()`. - -**{scope}/async_reader/delay_until_promise_satisfied** - -OpStats. Total latency for the read requests. - -**{scope}/async_reader/idle_reader_error** - -Counter. The number idle reader errors. - -BKDistributedLogManager ------------------------ - -Future Pools -~~~~~~~~~~~~ - -The stats about future pools that used by writers are exposed under {scope}/writer_future_pool, -while the stats about future pools that used by readers are exposed under {scope}/reader_future_pool. -See MonitoredFuturePool_ for detail stats. - -Distributed Locks -~~~~~~~~~~~~~~~~~ - -The stats about the locks used by writers are exposed under {scope}/lock while those used by readers -are exposed under {scope}/read_lock/lock. See DistributedReentrantLock_ for detail stats. - -Log Handlers -~~~~~~~~~~~~ - -**{scope}/logsegments** - -All basic stats of log handlers are exposed under {scope}/logsegments. See BKLogHandler_ for detail stats. - -**{scope}/segments** - -The stats about write log handlers are exposed under {scope}/segments. See BKLogWriteHandler_ for detail stats. - -**{scope}/readhead_worker** - -The stats about read log handlers are exposed under {scope}/readahead_worker. -See BKLogReadHandler_ for detail stats. - -Writers -~~~~~~~ - -All writer related metrics are exposed under {scope}/log_writer. See BKAsyncLogWriter_ for detail stats. - -Readers -~~~~~~~ - -All reader related metrics are exposed under {scope}/async_reader. See BKAsyncLogReader_ for detail stats. - -BKDistributedLogNamespace -------------------------- - -ZooKeeper Clients -~~~~~~~~~~~~~~~~~ - -There are various of zookeeper clients created per namespace for different purposes. They are: - -**{scope}/dlzk_factory_writer_shared** - -Stats about the zookeeper client shared by all DL writers. - -**{scope}/dlzk_factory_reader_shared** - -Stats about the zookeeper client shared by all DL readers. - -**{scope}/bkzk_factory_writer_shared** - -Stats about the zookeeper client used by bookkeeper client that shared by all DL writers. - -**{scope}/bkzk_factory_reader_shared** - -Stats about the zookeeper client used by bookkeeper client that shared by all DL readers. - -See ZooKeeperClient_ for zookeeper detail stats. - -BookKeeper Clients -~~~~~~~~~~~~~~~~~~ - -All the bookkeeper client related stats are exposed directly to current {scope}. See BookKeeperClient_ -for detail stats. - -Utils -~~~~~ - -**{scope}/factory/thread_pool** - -Stats about the ordered scheduler used by this namespace. See OrderedScheduler_ for detail stats. - -**{scope}/factory/readahead_thread_pool** - -Stats about the readahead thread pool executor used by this namespace. See MonitoredScheduledThreadPoolExecutor_ -for detail stats. - -**{scope}/writeLimiter** - -Stats about the global write limiter used by list namespace. - -DistributedLogManager -~~~~~~~~~~~~~~~~~~~~~ - -All the core stats about reader and writer are exposed under current {scope} via BKDistributedLogManager_. - -
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/start/building.rst ---------------------------------------------------------------------- diff --git a/docs/start/building.rst b/docs/start/building.rst new file mode 100644 index 0000000..5a821f2 --- /dev/null +++ b/docs/start/building.rst @@ -0,0 +1,92 @@ +--- +title: Build DistributedLog from Source +top-nav-group: setup +top-nav-pos: 1 +top-nav-title: Build DistributedLog from Source +layout: default +--- + +.. contents:: This page covers how to build DistributedLog {{ site.distributedlog_version }} from sources. + +Build DistributedLog +==================== + +In order to build DistributedLog you need the source code. Either `download the source of a release`_ or `clone the git repository`_. + +.. _download the source of a release: {{ site.baseurl }}/download +.. _clone the git repository: {{ site.github_url }} + +In addition you need **Maven 3** and a **JDK** (Java Development Kit). DistributedLog requires **at least Java 7** to build. We recommend using Java 8. + +To clone from git, enter: + +.. code-block:: bash + + git clone {{ site.github_url }} + + +The simplest way of building DistributedLog is by running: + +.. code-block:: bash + + mvn clean package -DskipTests + + +This instructs Maven_ (`mvn`) to first remove all existing builds (`clean`) and then create a new DistributedLog package(`package`). The `-DskipTests` command prevents Maven from executing the tests. + +.. _Maven: http://maven.apache.org + +Build +~~~~~ + +- Build all the components without running tests + +.. code-block:: bash + + mvn clean package -DskipTests + +- Build all the components and run all the tests + +.. code-block:: bash + + mvn clean package + + +- Build a single component: as distributedlog is using shade plugin. shade only run when packaging so pre-install the dependencies before building a single component. + +.. code-block:: bash + + mvn clean install -DskipTests + mvn -pl :<module-name> package [-DskipTests] // example: mvn-pl :distributedlog-core package + + +- Test a single class: as distributedlog is using shade plugin. shade only run when packaging so pre-install the dependencies before building a single component. + +.. code-block:: bash + + mvn clean install -DskipTests + mvn -pl :<module-name> clean test -Dtest=<test-class-name> + + +Scala Versions +~~~~~~~~~~~~~~ + +DistributedLog has dependencies such as `Twitter Util`_, Finagle_ written in Scala_. Users of the Scala API and libraries may have to match the Scala version of DistributedLog with the Scala version of their projects (because Scala is not strictly backwards compatible). + +.. _Twitter Util: https://twitter.github.io/util/ +.. _Finagle: https://twitter.github.io/finagle/ +.. _Scala: http://scala-lang.org + +**By default, DistributedLog is built with the Scala 2.11**. To build DistributedLog with Scala *2.10*, you can change the default Scala *binary version* with the following script: + +.. code-block:: bash + + # Switch Scala binary version between 2.10 and 2.11 + tools/change-scala-version.sh 2.10 + # Build with Scala version 2.10 + mvn clean install -DskipTests + + +DistributedLog is developed against Scala *2.11* and tested additionally against Scala *2.10*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible. + +Newer versions may be compatible, depending on breaking changes in the language features used by DistributedLog's dependencies, and the availability of DistributedLog's dependencies in those Scala versions. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/start/download.rst ---------------------------------------------------------------------- diff --git a/docs/start/download.rst b/docs/start/download.rst new file mode 100644 index 0000000..7bd4a8b --- /dev/null +++ b/docs/start/download.rst @@ -0,0 +1,73 @@ +--- +title: Download Releases +top-nav-group: setup +top-nav-pos: 2 +top-nav-title: Download Releases +layout: default +--- + +.. contents:: This page covers how to download DistributedLog releases. + +Releases +======== + +`0.3.51-RC1` is the latest release. + +You can verify your download by checking its md5 and sha1. + +0.3.51-RC1 +~~~~~~~~~~ + +This is the second release candidate for 0.3.51. + +- Source download: 0.3.51-RC1.zip_ +- Binary downloads: + - Service: distributedlog-service-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip_ + - Benchmark: distributedlog-benchmark-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip_ + - Tutorials: distributedlog-tutorials-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip_ + - All: distributedlog-all-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip_ + +.. _0.3.51-RC1.zip: https://github.com/twitter/distributedlog/archive/0.3.51-RC1.zip +.. _distributedlog-all-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC1/distributedlog-all-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip +.. _distributedlog-service-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC1/distributedlog-service-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip +.. _distributedlog-benchmark-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC1/distributedlog-benchmark-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip +.. _distributedlog-tutorials-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC1/distributedlog-tutorials-3ff9e33fa577f50eebb8ee971ddb265c971c3717.zip + +0.3.51-RC0 +~~~~~~~~~~ + +This is the first release candidate for 0.3.51_. + +- Source download: 0.3.51-RC0.zip_ +- Binary downloads: + - Service: distributedlog-service-63d214d3a739cb58a71a8b51127f165d15f00584.zip_ + - Benchmark: distributedlog-benchmark-63d214d3a739cb58a71a8b51127f165d15f00584.zip_ + - Tutorials: distributedlog-tutorials-63d214d3a739cb58a71a8b51127f165d15f00584.zip_ + - All: distributedlog-all-63d214d3a739cb58a71a8b51127f165d15f00584.zip_ + +.. _0.3.51: https://github.com/twitter/distributedlog/releases/tag/0.3.51-RC0 +.. _0.3.51-RC0.zip: https://github.com/twitter/distributedlog/archive/0.3.51-RC0.zip +.. _distributedlog-all-63d214d3a739cb58a71a8b51127f165d15f00584.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC0/distributedlog-all-63d214d3a739cb58a71a8b51127f165d15f00584.zip +.. _distributedlog-service-63d214d3a739cb58a71a8b51127f165d15f00584.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC0/distributedlog-service-63d214d3a739cb58a71a8b51127f165d15f00584.zip +.. _distributedlog-benchmark-63d214d3a739cb58a71a8b51127f165d15f00584.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC0/distributedlog-benchmark-63d214d3a739cb58a71a8b51127f165d15f00584.zip +.. _distributedlog-tutorials-63d214d3a739cb58a71a8b51127f165d15f00584.zip: https://github.com/twitter/distributedlog/releases/download/0.3.51-RC0/distributedlog-tutorials-63d214d3a739cb58a71a8b51127f165d15f00584.zip + +Maven Dependencies +================== + +You can add the following dependencies to your `pom.xml` to include Apache DistributedLog in your project. + +.. code-block:: xml + + <!-- use core library to access DL storage --> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>distributedlog-core_2.11</artifactId> + <version>{{ site.DL_VERSION_STABLE }}</version> + </dependency> + <!-- use thin proxy client to access DL via write proxy --> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>distributedlog-client_2.11</artifactId> + <version>{{ site.DL_VERSION_STABLE }}</version> + </dependency> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/start/quickstart.rst ---------------------------------------------------------------------- diff --git a/docs/start/quickstart.rst b/docs/start/quickstart.rst new file mode 100644 index 0000000..ce52ac1 --- /dev/null +++ b/docs/start/quickstart.rst @@ -0,0 +1,127 @@ +--- +title: Setup & Run Example +top-nav-group: quickstart +top-nav-pos: 1 +top-nav-title: Setup & Run Example +layout: default +--- + +.. contents:: Get a DistributedLog cluster up running locally and run the example program in a few simple steps. + +Quick Start +=========== + +This tutorial assumes you are starting from fresh and have no existing BookKeeper or ZooKeeper data. +If you already have an existing BookKeeper or ZooKeeper cluster, you can checkout the deploy_ section +for more details on how to deploy a production cluster. + +.. _deploy: ../operations/deployment + +Step 1: Download the binary +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Download_ the stable version of `DistributedLog` and un-zip it. + +.. _Download: ../download + +:: + + // Download the binary `distributedlog-all-${gitsha}.zip` + > unzip distributedlog-all-${gitsha}.zip + + +Step 2: Start ZooKeeper & BookKeeper +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +DistributedLog uses `ZooKeeper` as the metadata store and `BookKeeper` as the log segment store. So +you need to first start a zookeeper server and a few bookies if you don't already have one. You can +use the `dlog` script in `distributedlog-service` package to get a standalone bookkeeper sandbox. It +starts a zookeeper server and `N` bookies (N is 3 by default). + +:: + + // Start the local sandbox instance at port `7000` + > ./distributedlog-service/bin/dlog local 7000 + DistributedLog Sandbox is running now. You could access distributedlog://127.0.0.1:7000 + + +Step 3: Create a DistributedLog namespace +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Before using distributedlog, you need to create a distributedlog namespace to store your own list of +streams. The zkServer for the local sandbox is `127.0.0.1:7000` and the bookkeeper's ledgers path is +`/ledgers`. You could create a namespace pointing to the corresponding bookkeeper cluster. + +:: + + > ./distributedlog-service/bin/dlog admin bind -l /ledgers -s 127.0.0.1:7000 -c distributedlog://127.0.0.1:7000/messaging/my_namespace + No bookkeeper is bound to distributedlog://127.0.0.1:7000/messaging/my_namespace + Created binding on distributedlog://127.0.0.1:7000/messaging/my_namespace. + + +If you don't want to create a separated namespace, you could use the default namespace `distributedlog://127.0.0.1:7000/messaging/distributedlog`. + + +Step 4: Create some log streams +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Let's create 5 log streams, prefixed with `messaging-stream-`. + +:: + + > ./distributedlog-service/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/my_namespace -r messaging-stream- -e 1-5 + + +We can now see the streams if we run the `list` command from the tool. + +:: + + > ./distributedlog-service/bin/dlog tool list -u distributedlog://127.0.0.1:7000/messaging/my_namespace + Streams under distributedlog://127.0.0.1:7000/messaging/my_namespace : + -------------------------------- + messaging-stream-1 + messaging-stream-3 + messaging-stream-2 + messaging-stream-4 + messaging-stream-5 + -------------------------------- + + +Step 5: Start a write proxy +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Now, lets start a write proxy server that serves writes to distributedlog namespace `distributedlog://127.0.0.1/messaging/my_namespace`. The server listens on 8000 to accept fan-in write requests. + +:: + + > ./distributedlog-service/bin/dlog-daemon.sh start writeproxy -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/my_namespace -mx -c `pwd`/distributedlog-service/conf/distributedlog_proxy.conf + +From 0.3.51-RC1 and onwards, use the below command to start the write proxy + +:: + + > WP_SHARD_ID=1 WP_SERVICE_PORT=8000 WP_STATS_PORT=8001 WP_NAMESPACE='distributedlog://127.0.0.1:7000/messaging/my_namespace' ./distributedlog-service/bin/dlog-daemon.sh start writeproxy + +Step 6: Tail reading records +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The distributedlog tutorial has a multi-streams reader that will dump out received records to standard output. + +:: + + > ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.MultiReader distributedlog://127.0.0.1:7000/messaging/my_namespace messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5 + + +Step 7: Write some records +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The distributedlog tutorial also has a multi-streams writer that will take input from a console and write it out +as records to the distributedlog write proxy. Each line will be sent as a separate record. + +Run the writer and type a few lines into the console to send to the server. + +:: + + > ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.ConsoleProxyMultiWriter 'inet!127.0.0.1:8000' messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5 + +If you have each of the above commands running in a different terminal then you should now be able to type messages into the writer terminal and see them appear in the reader terminal. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/styles/site.scss ---------------------------------------------------------------------- diff --git a/docs/styles/site.scss b/docs/styles/site.scss new file mode 100644 index 0000000..cb6f8f8 --- /dev/null +++ b/docs/styles/site.scss @@ -0,0 +1,4 @@ +--- +--- + +@import "bootstrap"; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/analytics-mapreduce.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/analytics-mapreduce.rst b/docs/tutorials/analytics-mapreduce.rst index 0c399bc..6a18d4a 100644 --- a/docs/tutorials/analytics-mapreduce.rst +++ b/docs/tutorials/analytics-mapreduce.rst @@ -1 +1,214 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-mapreduce/README.md +--- +title: Tutorial - DistributedLog meets MapReduce +layout: default +--- + +.. contents:: Tutorial - DistributedLog meets MapReduce + +DistributedLog meets MapReduce +============================== + +A distributedlog log stream is consists of log segments. Each log segment is distributed +among multiple bookies node. This nature of data distribution allows distributedlog easily +integrated with any analytics processing systems like *MapReduce* and *Spark*. This tutorial +shows how you could use *MapReduce* to process log streams' data in batch and how *MapReduce* +can leverage the data locality of log segments. + +InputFormat +~~~~~~~~~~~ + +**InputFormat** is one of the fundamental class in Hadoop MapReduce framework, that is used +for accessing data from different sources. The class is responsible for defining two main +things: + +- Data Splits +- Record Reader + +*Data Split* is a fundamental concept in Hadoop MapReduce framework which defines both +the size of individual Map tasks and its potential execution server. The *Record Reader* is +responsible for actual reading records from the *data split* and submitting them (as key/value +pairs) to the mapper. + +Using distributedlog log streams as the sources for a MapReduce job, the *log segments* are +the *data splits*, while the *log segment reader* for a log segment is the *record reader* for +a *data split*. + +Log Segment vs Data Split +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Any split implementation extends the Apache base abstract class - **InputSplit**, defining a +split length and locations. A distributedlog log segment has *record count*, which could be used +to define the length of the split, and its metadata contains the storage nodes that are used to +store its log records, which could be used to define the locations of the split. So we could +create a **LogSegmentSplit** wrapping over a *LogSegment* (LogSegmentMetadata and LedgerMetadata). + +:: + + public class LogSegmentSplit extends InputSplit { + + private LogSegmentMetadata logSegmentMetadata; + private LedgerMetadata ledgerMetadata; + + public LogSegmentSplit() {} + + public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, + LedgerMetadata ledgerMetadata) { + this.logSegmentMetadata = logSegmentMetadata; + this.ledgerMetadata = ledgerMetadata; + } + + } + + +The length of the log segment split is the *number of records in the log segment*. + +:: + + @Override + public long getLength() + throws IOException, InterruptedException { + return logSegmentMetadata.getRecordCount(); + } + + +The locations of the log segment split are the bookies' addresses in the ensembles of +the log segment. + +:: + + @Override + public String[] getLocations() + throws IOException, InterruptedException { + Set<String> locations = Sets.newHashSet(); + for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) { + for (BookieSocketAddress host : ensemble) { + locations.add(host.getHostName()); + } + } + return locations.toArray(new String[locations.size()]); + } + + +At this point, we will have a basic **LogSegmentSplit** wrapping *LogSegmentMetadata* and +*LedgerMetadata*. Then we could retrieve the list of log segments of a log stream and construct +corresponding *data splits* in distributedlog inputformat. + +:: + + public class DistributedLogInputFormat + extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable { + + @Override + public List<InputSplit> getSplits(JobContext jobContext) + throws IOException, InterruptedException { + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size()); + BookKeeper bk = namespace.getReaderBKC().get(); + LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); + final AtomicInteger rcHolder = new AtomicInteger(0); + final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); + for (LogSegmentMetadata segment : segments) { + final CountDownLatch latch = new CountDownLatch(1); + lm.readLedgerMetadata(segment.getLedgerId(), + new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { + @Override + public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { + metadataHolder.set(ledgerMetadata); + rcHolder.set(rc); + latch.countDown(); + } + }); + latch.await(); + if (BKException.Code.OK != rcHolder.get()) { + throw new IOException("Faild to get log segment metadata for " + segment + " : " + + BKException.getMessage(rcHolder.get())); + } + inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); + } + return inputSplits; + } + + } + + +Log Segment Record Reader +~~~~~~~~~~~~~~~~~~~~~~~~~ + +At this point, we know how to break the log streams into *data splits*. Then we need to be able +to create a **RecordReader** for individual *data split*. Since each *data split* is effectively +a *log segment* in distributedlog, it is straight to implement it using distributedlog's log segment +reader. For simplicity, this example uses the raw bk api to access entries, which it doesn't +leverage features like **ReadAhead** provided in distributedlog. It could be changed to +use log segment reader for better performance. + +From the *data split*, we know which log segment and its corresponding bookkeeper ledger. Then +we could open the ledger handle when initializing the record reader. + +:: + + LogSegmentReader(String streamName, + DistributedLogConfiguration conf, + BookKeeper bk, + LogSegmentSplit split) + throws IOException { + this.streamName = streamName; + this.bk = bk; + this.metadata = split.getMetadata(); + try { + this.lh = bk.openLedgerNoRecovery( + split.getLedgerId(), + BookKeeper.DigestType.CRC32, + conf.getBKDigestPW().getBytes(UTF_8)); + } catch (BKException e) { + throw new IOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + + +Reading records from the *data split* is effectively reading records from the distributedlog +log segment. + +:: + + try { + Enumeration<LedgerEntry> entries = + lh.readEntries(entryId, entryId); + if (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + Entry.newBuilder() + .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), + metadata.getStartSequenceId()) + .setEntryId(entry.getEntryId()) + .setEnvelopeEntry( + LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) + .deserializeRecordSet(true) + .setInputStream(entry.getEntryInputStream()) + .buildReader(); + } + return nextKeyValue(); + } catch (BKException e) { + throw new IOException(e); + } + + +We could calculate the progress by comparing the position with the record count of this log segment. + +:: + + @Override + public float getProgress() + throws IOException, InterruptedException { + if (metadata.getRecordCount() > 0) { + return ((float) (readPos + 1)) / metadata.getRecordCount(); + } + return 1; + } + + +Once we have *LogSegmentSplit* and the *LogSegmentReader* over a split. We could hook them up to +implement distributedlog's InputFormat. Please check out the code_ for more details. + +.. _code: https://github.com/apache/incubator-distributedlog/tree/master/distributedlog-tutorials/distributedlog-mapreduce http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/basic-1.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/basic-1.rst b/docs/tutorials/basic-1.rst index 20be936..47a4bad 100644 --- a/docs/tutorials/basic-1.rst +++ b/docs/tutorials/basic-1.rst @@ -1 +1,250 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-basic/basic-1.md +--- +title: API - Write Records (via core library) +top-nav-group: quickstart +top-nav-pos: 2 +top-nav-title: API - Write Records (via core library) +layout: default +--- + +.. contents:: Basic Tutorial - Using Core Library to write records + +Basic Tutorial - Write Records using Core Library +================================================= + +This tutorial shows how to write records using core library. + +.. sectnum:: + +Open a writer +~~~~~~~~~~~~~ + +Before everything, you have to open a writer to write records. +These are the steps to follow to `open a writer`. + +Create distributedlog URI +------------------------- + +:: + + String dlUriStr = ...; + URI uri = URI.create(dlUriStr); + +Create distributedlog configuration +----------------------------------- + +:: + + DistributedLogConfiguration conf = new DistributedLogConfiguration(); + + +Enable immediate flush +---------------------- + +:: + + conf.setImmediateFlushEnabled(true); + conf.setOutputBufferSize(0); + conf.setPeriodicFlushFrequencyMilliSeconds(0); + + +Enable immediate locking +------------------------ + +So if there is already a writer wring to the stream, opening another writer will +fail because previous writer already held a lock. + +:: + + conf.setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE); + + +Build the distributedlog namespace +---------------------------------- + +:: + + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .regionId(DistributedLogConstants.LOCAL_REGION_ID) + .clientId("console-writer") + .build(); + + +Open the writer +--------------- + +:: + + DistributedLogManager dlm = namespace.openLog("basic-stream-1"); + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + + +Write Records +~~~~~~~~~~~~~ + +Once you got a `writer` instance, you can start writing `records` into the stream. + +Construct a log record +---------------------- + +Here lets use `System.currentTimeMillis()` as the `TransactionID`. + +:: + + byte[] data = ...; + LogRecord record = new LogRecord(System.currentTimeMillis(), data); + + +Write the log record +-------------------- + +:: + + Future<DLSN> writeFuture = writer.write(record); + + +Register the write callback +--------------------------- + +Register a future listener on write completion. The writer will be notified once the write is completed. + +:: + + writeFuture.addEventListener(new FutureEventListener<DLSN>() { + @Override + public void onFailure(Throwable cause) { + // executed when write failed. + } + + @Override + public void onSuccess(DLSN value) { + // executed when write completed. + } + }); + + +Close the writer +~~~~~~~~~~~~~~~~ + +Close the writer after usage +---------------------------- + +:: + + FutureUtils.result(writer.asyncClose()); + + +Run the tutorial +~~~~~~~~~~~~~~~~ + +Run the example in the following steps: + +Start the local bookkeeper cluster +---------------------------------- + +You can use follow command to start the distributedlog stack locally. +After the distributedlog is started, you could access it using +distributedlog uri *distributedlog://127.0.0.1:7000/messaging/distributedlog*. + +:: + + // dlog local ${zk-port} + ./distributedlog-core/bin/dlog local 7000 + + +Create the stream +----------------- + +:: + + // Create Stream `basic-stream-1` + // dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex} + ./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 1 + + +Tail the stream +--------------- + +Tailing the stream using `TailReader` to wait for new records. + +:: + + // Tailing Stream `basic-stream-1` + // runner run com.twitter.distributedlog.basic.TailReader ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.TailReader distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-1 + + +Write records +------------- + +Run the example to write records to the stream in a console. + +:: + + // Write Records into Stream `basic-stream-1` + // runner run com.twitter.distributedlog.basic.ConsoleWriter ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.ConsoleWriter distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-1 + + +Check the results +----------------- + +Example output from `ConsoleWriter` and `TailReader`. + +:: + + // Output of `ConsoleWriter` + Opening log stream basic-stream-1 + [dlog] > test! + [dlog] > + + + // Output of `TailReader` + Opening log stream basic-stream-1 + Log stream basic-stream-1 is empty. + Wait for records starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + """ + test! + """ + +Attempt a second writer +----------------------- + +Open another terminal to run `ConsoleWriter`. It would fail with `OwnershipAcquireFailedException` as previous +`ConsoleWriter` is still holding lock on writing to stream `basic-stream-1`. + +:: + + Opening log stream basic-stream-1 + Exception in thread "main" com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException: LockPath - /messaging/distributedlog/basic-stream-1/<default>/lock: Lock acquisition failed, the current owner is console-writer + at com.twitter.distributedlog.lock.ZKSessionLock$8.apply(ZKSessionLock.java:570) + at com.twitter.distributedlog.lock.ZKSessionLock$8.apply(ZKSessionLock.java:567) + at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$8.apply(Future.scala:1041) + at com.twitter.util.Try$.apply(Try.scala:13) + at com.twitter.util.Future$.apply(Future.scala:132) + at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:1041) + at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:1040) + at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112) + at com.twitter.util.Promise$Transformer.k(Promise.scala:112) + at com.twitter.util.Promise$Transformer.apply(Promise.scala:122) + at com.twitter.util.Promise$Transformer.apply(Promise.scala:103) + at com.twitter.util.Promise$$anon$1.run(Promise.scala:357) + at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178) + at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136) + at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207) + at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92) + at com.twitter.util.Promise.runq(Promise.scala:350) + at com.twitter.util.Promise.updateIfEmpty(Promise.scala:716) + at com.twitter.util.Promise.update(Promise.scala:694) + at com.twitter.util.Promise.setValue(Promise.scala:670) + at com.twitter.distributedlog.lock.ZKSessionLock$9.safeRun(ZKSessionLock.java:622) + at org.apache.bookkeeper.util.SafeRunnable.run(SafeRunnable.java:31) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) + at java.util.concurrent.FutureTask.run(FutureTask.java:262) + at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) + at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) + at java.lang.Thread.run(Thread.java:745) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/basic-2.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/basic-2.rst b/docs/tutorials/basic-2.rst index f0fe6b0..10fb794 100644 --- a/docs/tutorials/basic-2.rst +++ b/docs/tutorials/basic-2.rst @@ -1 +1,221 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-basic/basic-2.md +--- +title: API - Write Records (via write proxy) +top-nav-group: quickstart +top-nav-pos: 3 +top-nav-title: API - Write Records (via write proxy) +layout: default +--- + +.. contents:: Basic Tutorial - Using Proxy Client to write records + +Basic Tutorial - Write Records using Write Proxy Client +======================================================= + +This tutorial shows how to write records using write proxy client. + +.. sectnum:: + +Open a write proxy client +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Create write proxy client builder +--------------------------------- + +:: + + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder(); + .clientId(ClientId.apply("console-proxy-writer")) + .name("console-proxy-writer"); + + +Enable thrift mux +----------------- + +:: + + builder = builder.thriftmux(true); + + +Point the client to write proxy using finagle name +-------------------------------------------------- + +:: + + String finagleNameStr = "inet!127.0.0.1:8000"; + builder = builder.finagleNameStr(finagleNameStr); + + +Build the write proxy client +---------------------------- + +:: + + DistributedLogClient client = builder.build(); + + +Write Records +~~~~~~~~~~~~~ + +Write records to a stream +------------------------- + +Application does not have to provide `TransactionID` on writing. +The `TransactionID` of a record is assigned by the write proxy. + +:: + + String streamName = "basic-stream-2"; + byte[] data = ...; + Future<DLSN> writeFuture = client.write(streamName, ByteBuffer.wrap(data)); + + +Register the write callback +--------------------------- + +Register a future listener on write completion. The writer will be notified once the write is completed. + +:: + + writeFuture.addEventListener(new FutureEventListener<DLSN>() { + @Override + public void onFailure(Throwable cause) { + // executed when write failed. + } + + @Override + public void onSuccess(DLSN value) { + // executed when write completed. + } + }); + + +Close the write proxy client +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Close the write proxy client after usage. + +:: + + client.close(); + + +Run the tutorial +~~~~~~~~~~~~~~~~ + +Run the example in the following steps: + +Start the local bookkeeper cluster +---------------------------------- + +You can use follow command to start the distributedlog stack locally. +After the distributedlog cluster is started, you could access it using +distributedlog uri *distributedlog://127.0.0.1:7000/messaging/distributedlog*. + +:: + + // dlog local ${zk-port} + ./distributedlog-core/bin/dlog local 7000 + + +Start the write proxy +--------------------- + +Start the write proxy, listening on port 8000. + +:: + + // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file} + ./distributedlog-service/bin/dlog com.twitter.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf + + +Create the stream +----------------- + +Create the stream under the distributedlog uri. + +:: + + // Create Stream `basic-stream-2` + // dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex} + ./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 2 + + +Tail the stream +--------------- + +Tailing the stream using `TailReader` to wait for new records. + +:: + + // Tailing Stream `basic-stream-2` + // runner run com.twitter.distributedlog.basic.TailReader ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.TailReader distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-2 + + +Write records +------------- + +Run the example to write records to the stream in a console. + +:: + + // Write Records into Stream `basic-stream-2` + // runner run com.twitter.distributedlog.basic.ConsoleProxyWriter ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.ConsoleProxyWriter 'inet!127.0.0.1:8000' basic-stream-2 + + +Check the results +----------------- + +Example output from `ConsoleProxyWriter` and `TailReader`. + +:: + + // Output of `ConsoleProxyWriter` + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@756d7bba) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@1d2e91f5) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@5c707aca) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@5c8d932f) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@52ba2221) + May 08, 2016 10:27:41 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp + [dlog] > test-proxy-writer + [dlog] > + + + // Output of `TailReader` + Opening log stream basic-stream-2 + Log stream basic-stream-2 is empty. + Wait for records starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + """ + test-proxy-writer + """ + + +Attempt a second writer +----------------------- + +Open another terminal to run `ConsoleProxyWriter`. The write should succeed as write proxy is able to accept +fan-in writes. Please checkout section `Considerations` to see the difference between **Write Ordering** and +**Read Ordering**. + +:: + + May 08, 2016 10:31:54 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@756d7bba) + May 08, 2016 10:31:54 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@1d2e91f5) + May 08, 2016 10:31:54 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@5c707aca) + May 08, 2016 10:31:54 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@5c8d932f) + May 08, 2016 10:31:54 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@52ba2221) + May 08, 2016 10:31:54 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp + [dlog] > test-write-proxy-message-2 + [dlog] > http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/basic-3.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/basic-3.rst b/docs/tutorials/basic-3.rst index f2853fc..26f3336 100644 --- a/docs/tutorials/basic-3.rst +++ b/docs/tutorials/basic-3.rst @@ -1 +1,280 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-basic/basic-3.md +--- +title: API - Write Records to Multiple Streams +layout: default +--- + +.. contents:: Basic Tutorial - Write Records to Multiple Streams + +Write Records to Multiple Streams +================================= + +This tutorial shows how to write records using write proxy multi stream writer. The `DistributedLogMultiStreamWriter` +is a wrapper over `DistributedLogClient` on writing records to a set of streams in a `round-robin` way and ensure low write latency even on single stream ownership failover. + +.. sectnum:: + +Open a write proxy client +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Before everything, you have to open a write proxy client to write records. +These are the steps to follow to `open a write proxy client`. + +Create write proxy client builder +--------------------------------- + +:: + + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() + .clientId(ClientId.apply("console-proxy-writer")) + .name("console-proxy-writer"); + + +Enable thrift mux +----------------- + +:: + + builder = builder.thriftmux(true); + + +Point the client to write proxy using finagle name +-------------------------------------------------- + +:: + + String finagleNameStr = "inet!127.0.0.1:8000"; + builder = builder.finagleNameStr(finagleNameStr); + + +Build the write proxy client +---------------------------- + +:: + + DistributedLogClient client = builder.build(); + + +Create a `MultiStreamWriter` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Create multi stream writer builder +---------------------------------- + +:: + + DistributedLogMultiStreamWriterBuilder builder = DistributedLogMultiStreamWriter.newBuilder(); + + +Build the writer to write a set of streams +------------------------------------------ + +:: + + List<String> streams = ...; + builder = builder.streams(streams); + + +Point the multi stream writer to use write proxy client +------------------------------------------------------- + +:: + + builder = builder.client(client); + + +Configure the flush policy for the multi stream writer +------------------------------------------------------ + +:: + + // transmit immediately after a record is written. + builder = builder.bufferSize(0); + builder = builder.flushIntervalMs(0); + + +Configure the request timeouts and retry policy for the multi stream writer +--------------------------------------------------------------------------- + +:: + + // Configure the speculative timeouts - if writing to a stream cannot + // complete within the speculative timeout, it would try writing to + // another streams. + builder = builder.firstSpeculativeTimeoutMs(10000) + builder = builder.maxSpeculativeTimeoutMs(20000) + // Configure the request timeout. + builder = builder.requestTimeoutMs(50000); + + +Build the multi writer +---------------------- + +:: + + DistributedLogMultiStreamWriter writer = builder.build(); + + +Write Records +~~~~~~~~~~~~~ + +Write records to multi streams +------------------------------ + +:: + + byte[] data = ...; + Future<DLSN> writeFuture = writer.write(ByteBuffer.wrap(data)); + + +Register the write callback +--------------------------- + +Register a future listener on write completion. + +:: + + writeFuture.addEventListener(new FutureEventListener<DLSN>() { + @Override + public void onFailure(Throwable cause) { + // executed when write failed. + } + + @Override + public void onSuccess(DLSN value) { + // executed when write completed. + } + }); + + +Run the tutorial +~~~~~~~~~~~~~~~~ + +Run the example in the following steps: + +Start the local bookkeeper cluster +---------------------------------- + +You can use follow command to start the distributedlog stack locally. +After the distributedlog is started, you could access it using +distributedlog uri *distributedlog://127.0.0.1:7000/messaging/distributedlog*. + +:: + + // dlog local ${zk-port} + ./distributedlog-core/bin/dlog local 7000 + + +Start the write proxy +--------------------- + +Start the write proxy, listening on port 8000. + +:: + + // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file} + ./distributedlog-service/bin/dlog com.twitter.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf + + +Create multiple streams +----------------------- + +Create multiple streams under the distributedlog uri. + +:: + + // Create Stream `basic-stream-{3-7}` + // dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex} + ./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 3-7 + + +Tail the streams +---------------- + +Tailing the streams using `MultiReader` to wait for new records. + +:: + + // Tailing Stream `basic-stream-{3-7}` + // runner run com.twitter.distributedlog.basic.MultiReader ${distributedlog-uri} ${stream}[,${stream}] + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.MultiReader distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-3,basic-stream-4,basic-stream-5,basic-stream-6,basic-stream-7 + + +Write the records +----------------- + +Run the example to write records to the multi streams in a console. + +:: + + // Write Records into Stream `basic-stream-{3-7}` + // runner run com.twitter.distributedlog.basic.ConsoleProxyMultiWriter ${distributedlog-uri} ${stream}[,${stream}] + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.ConsoleProxyMultiWriter 'inet!127.0.0.1:8000' basic-stream-3,basic-stream-4,basic-stream-5,basic-stream-6,basic-stream-7 + +Check the results +----------------- + +Example output from `ConsoleProxyMultiWriter` and `MultiReader`. + +:: + + // Output of `ConsoleProxyWriter` + May 08, 2016 11:09:21 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@fbb628c) + May 08, 2016 11:09:21 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@5a25adb1) + May 08, 2016 11:09:21 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@5fae6db3) + May 08, 2016 11:09:21 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@34a433d8) + May 08, 2016 11:09:21 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@847c4e8) + May 08, 2016 11:09:22 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp + [dlog] > message-1 + [dlog] > message-2 + [dlog] > message-3 + [dlog] > message-4 + [dlog] > message-5 + [dlog] > + + + // Output of `MultiReader` + Opening log stream basic-stream-3 + Opening log stream basic-stream-4 + Opening log stream basic-stream-5 + Opening log stream basic-stream-6 + Opening log stream basic-stream-7 + Log stream basic-stream-4 is empty. + Wait for records from basic-stream-4 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Open reader to read records from stream basic-stream-4 + Log stream basic-stream-5 is empty. + Wait for records from basic-stream-5 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Open reader to read records from stream basic-stream-5 + Log stream basic-stream-6 is empty. + Wait for records from basic-stream-6 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Open reader to read records from stream basic-stream-6 + Log stream basic-stream-3 is empty. + Wait for records from basic-stream-3 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Open reader to read records from stream basic-stream-3 + Log stream basic-stream-7 is empty. + Wait for records from basic-stream-7 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Open reader to read records from stream basic-stream-7 + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream basic-stream-4 + """ + message-1 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream basic-stream-6 + """ + message-2 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream basic-stream-3 + """ + message-3 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream basic-stream-7 + """ + message-4 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream basic-stream-5 + """ + message-5 + """ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/basic-4.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/basic-4.rst b/docs/tutorials/basic-4.rst index eb09753..fe905d3 100644 --- a/docs/tutorials/basic-4.rst +++ b/docs/tutorials/basic-4.rst @@ -1 +1,241 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-basic/basic-4.md +--- +title: API - Atomic Write Multiple Records +layout: default +--- + +.. contents:: Basic Tutorial - Write Multi Records Atomic using Write Proxy Client + +Write Multi Records Atomic using Write Proxy Client +=================================================== + +This tutorial shows how to write multi records atomic using write proxy client. + +.. sectnum:: + +Open a write proxy client +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Create write proxy client builder +--------------------------------- + +:: + + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder(); + .clientId(ClientId.apply("atomic-writer")) + .name("atomic-writer"); + + +Enable thrift mux +----------------- + +:: + + builder = builder.thriftmux(true); + + +Point the client to write proxy using finagle name +-------------------------------------------------- + +:: + + String finagleNameStr = "inet!127.0.0.1:8000"; + builder = builder.finagleNameStr(finagleNameStr); + + +Build the write proxy client +---------------------------- + +:: + + DistributedLogClient client = builder.build(); + + +Write Records +~~~~~~~~~~~~~ + +Create a RecordSet +------------------ + +Create a `RecordSet` for multiple records. The RecordSet has initial `16KB` buffer and its +compression codec is `NONE`. + +:: + + LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(16 * 1024, Type.NONE); + + +Write multiple records +---------------------- + +Write multiple records into the `RecordSet`. + +:: + + for (String msg : messages) { + ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes(UTF_8)); + Promise<DLSN> writeFuture = new Promise<DLSN>(); + recordSetWriter.writeRecord(msgBuf, writeFuture); + } + + +Write the RecordSet +------------------- + +Write the `RecordSet` to a stream. + +:: + + String streamName = "basic-stream-8"; + Future<DLSN> writeFuture = client.writeRecordSet(streamName, recordSetWriter); + + +Register the write callback +--------------------------- + +Register a future listener on write completion. The writer will be notified once the write is completed. + +:: + + writeFuture.addEventListener(new FutureEventListener<DLSN>() { + @Override + public void onFailure(Throwable cause) { + // executed when write failed. + recordSetWriter.abortTransmit(cause); + } + + @Override + public void onSuccess(DLSN value) { + // executed when write completed. + recordSetWriter.completeTransmit( + dlsn.getLogSegmentSequenceNo(), + dlsn.getEntryId(), + dlsn.getSlotId()); + } + }); + + +Close the write proxy client +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Close the write proxy client after usage. + +:: + + client.close(); + + +Run the tutorial +~~~~~~~~~~~~~~~~ + +Run the example in the following steps: + +Start the local bookkeeper cluster +---------------------------------- + +You can use follow command to start the distributedlog stack locally. +After the distributedlog cluster is started, you could access it using +distributedlog uri *distributedlog://127.0.0.1:7000/messaging/distributedlog*. + +:: + + // dlog local ${zk-port} + ./distributedlog-core/bin/dlog local 7000 + + +Start the write proxy +--------------------- + +Start the write proxy, listening on port 8000. + +:: + + // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file} + ./distributedlog-service/bin/dlog com.twitter.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf + + +Create the stream +----------------- + +Create the stream under the distributedlog uri. + +:: + + // Create Stream `basic-stream-8` + // dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex} + ./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 8 + + +Tail the stream +--------------- + +Tailing the stream using `TailReader` to wait for new records. + +:: + + // Tailing Stream `basic-stream-8` + // runner run com.twitter.distributedlog.basic.TailReader ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.TailReader distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-8 + + +Write records +------------- + +Run the example to write multiple records to the stream. + +:: + + // Write Records into Stream `basic-stream-8` + // runner run com.twitter.distributedlog.basic.AtomicWriter ${distributedlog-uri} ${stream} ${message}[, ${message}] + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.AtomicWriter 'inet!127.0.0.1:8000' basic-stream-8 "message-1" "message-2" "message-3" "message-4" "message-5" + + +Check the results +----------------- + +Example output from `AtomicWriter` and `TailReader`. + +:: + + // Output of `AtomicWriter` + May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@6c3e459e) + May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@4d5698f) + May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@57052dc3) + May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@14ff89d7) + May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@14b28d06) + May 08, 2016 11:48:19 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp + Write 'message-1' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Write 'message-2' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=1} + Write 'message-3' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=2} + Write 'message-4' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=3} + Write 'message-5' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=4} + + + // Output of `TailReader` + Opening log stream basic-stream-8 + Log stream basic-stream-8 is empty. + Wait for records starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + """ + message-1 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=1} + """ + message-2 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=2} + """ + message-3 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=3} + """ + message-4 + """ + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=4} + """ + message-5 + """ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b169db04/docs/tutorials/basic-5.rst ---------------------------------------------------------------------- diff --git a/docs/tutorials/basic-5.rst b/docs/tutorials/basic-5.rst index af0f8d6..329bba6 100644 --- a/docs/tutorials/basic-5.rst +++ b/docs/tutorials/basic-5.rst @@ -1 +1,223 @@ -.. markdowninclude:: ../../distributedlog-tutorials/distributedlog-basic/basic-5.md +--- +title: API - Read Records +top-nav-group: quickstart +top-nav-pos: 4 +top-nav-title: API - Read Records +layout: default +--- + +.. contents:: Basic Tutorial - Tail reading records from a stream + +Tail reading records from a stream +================================== + +This tutorial shows how to tail read records from a stream. + +.. sectnum:: + +Open a distributedlog manager +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Create distributedlog URI +------------------------- + +:: + + String dlUriStr = ...; + URI uri = URI.create(dlUriStr); + + +Create distributedlog configuration +----------------------------------- + +:: + + DistributedLogConfiguration conf = new DistributedLogConfiguration(); + + +Build the distributedlog namespace +---------------------------------- + +:: + + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .build(); + + +Open the distributedlog manager +------------------------------- + +:: + + DistributedLogManager dlm = namespace.openLog("basic-stream-9"); + + +Get Last Record +~~~~~~~~~~~~~~~ + +Get the last record from the record. From the last record, we can use `DLSN` of last record +to start tailing the stream. + +:: + + LogRecordWithDLSN record = dlm.getLastLogRecord(); + DLSN lastDLSN = record.getDlsn(); + + +Read Records +~~~~~~~~~~~~ + +Open the stream +--------------- + +Open the stream to start read the records. + +:: + + AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN)); + + +Read the records +---------------- + +Read the next available record from the stream. The future is satisified when the record is available. + +:: + + Future<LogRecordWithDLSN> readFuture = reader.readNext(); + + +Register the read callback +--------------------------- + +Register a future listener on read completion. The reader will be notified once the record is ready for consuming. + +:: + + final FutureEventListener<LogRecordWithDLSN> readListener = new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onFailure(Throwable cause) { + // executed when read failed. + } + + @Override + public void onSuccess(LogRecordWithDLSN record) { + // process the record + ... + // issue read next + reader.readNext().addEventListener(this); + } + }; + reader.readNext().addEventListener(readListener); + + +Close the reader +~~~~~~~~~~~~~~~~ + +Close the reader after usage. + +:: + + FutureUtils.result(reader.asyncClose()); + + +Run the tutorial +~~~~~~~~~~~~~~~~ + +Run the example in the following steps: + +Start the local bookkeeper cluster +---------------------------------- + +You can use follow command to start the distributedlog stack locally. +After the distributedlog cluster is started, you could access it using +distributedlog uri *distributedlog://127.0.0.1:7000/messaging/distributedlog*. + +:: + + // dlog local ${zk-port} + ./distributedlog-core/bin/dlog local 7000 + + +Start the write proxy +--------------------- + +Start the write proxy, listening on port 8000. + +:: + + // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file} + ./distributedlog-service/bin/dlog com.twitter.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf + + +Create the stream +----------------- + +Create the stream under the distributedlog uri. + +:: + + // Create Stream `basic-stream-9` + // dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex} + ./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 9 + + +Tail the stream +--------------- + +Tailing the stream using `TailReader` to wait for new records. + +:: + + // Tailing Stream `basic-stream-9` + // runner run com.twitter.distributedlog.basic.TailReader ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.TailReader distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-9 + + +Write records +------------- + +Run the example to write records to the stream in a console. + +:: + + // Write Records into Stream `basic-stream-9` + // runner run com.twitter.distributedlog.basic.ConsoleProxyWriter ${distributedlog-uri} ${stream} + ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.ConsoleProxyWriter 'inet!127.0.0.1:8000' basic-stream-9 + + +Check the results +----------------- + +Example output from `ConsoleProxyWriter` and `TailReader`. + +:: + + // Output of `ConsoleProxyWriter` + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@756d7bba) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@1d2e91f5) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@5c707aca) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@5c8d932f) + May 08, 2016 10:27:41 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply + INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@52ba2221) + May 08, 2016 10:27:41 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp + [dlog] > test-proxy-writer + [dlog] > + + + // Output of `TailReader` + Opening log stream basic-stream-9 + Log stream basic-stream-9 is empty. + Wait for records starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} + """ + test-proxy-writer + """ + +
