[GitHub] incubator-distributedlog pull request #134: Fix merge script issues

2017-06-12 Thread sijie
GitHub user sijie opened a pull request:

https://github.com/apache/incubator-distributedlog/pull/134

Fix merge script issues

Problem:

Currently the merge script is broken when the reviewer's id is not in 
reviewers list.

Solution:

- Change "reviewers[reviewer_id]" to "reviewer_id in reviewers"
- Bump the default version to 0.5.0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sijie/incubator-distributedlog 
fix_merge_script_issues

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-distributedlog/pull/134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #134


commit 32cceab2b349e27df70c9b8a26044ddd68b39866
Author: Sijie Guo 
Date:   2017-05-26T21:04:31Z

fix merge script issue

commit d5a6f596cf174a6eddbc0acc3e4fffd5698f5ded
Author: Sijie Guo 
Date:   2017-06-12T19:54:01Z

Merge branch 'master' into fix_merge_script_issues

commit 56b65dfdeb296128085531dd35c07f5c35d0b65d
Author: Sijie Guo 
Date:   2017-06-12T19:54:49Z

Bump release version to 0.5.0




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-distributedlog issue #134: Fix merge script issues

2017-06-12 Thread sijie
Github user sijie commented on the issue:

https://github.com/apache/incubator-distributedlog/pull/134
  
/cc @fcuny @jiazhai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-distributedlog git commit: DL-204: Bump libthrift to latest version for distributedlog-core

2017-06-12 Thread sijie
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 9c6c9c452 -> 0cb775c93


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-tutorials/distributedlog-basic/basic-2.md
--
diff --git a/distributedlog-tutorials/distributedlog-basic/basic-2.md 
b/distributedlog-tutorials/distributedlog-basic/basic-2.md
index 0459bbf..b28cb9a 100644
--- a/distributedlog-tutorials/distributedlog-basic/basic-2.md
+++ b/distributedlog-tutorials/distributedlog-basic/basic-2.md
@@ -74,7 +74,7 @@ Run the example in the following steps:
 2.  Start the write proxy, listening on port 8000.
 ```
 // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} 
-sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
-./distributedlog-service/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
+./distributedlog-proxy-server/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-proxy-server/conf/distributedlog_proxy.conf
 ```
 
 3.  Create the stream under the distributedlog uri.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-tutorials/distributedlog-basic/basic-3.md
--
diff --git a/distributedlog-tutorials/distributedlog-basic/basic-3.md 
b/distributedlog-tutorials/distributedlog-basic/basic-3.md
index 56972bf..5ddea90 100644
--- a/distributedlog-tutorials/distributedlog-basic/basic-3.md
+++ b/distributedlog-tutorials/distributedlog-basic/basic-3.md
@@ -107,7 +107,7 @@ Run the example in the following steps:
 2.  Start the write proxy, listening on port 8000.
 ```
 // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} 
-sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
-./distributedlog-service/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
+./distributedlog-proxy-server/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-proxy-server/conf/distributedlog_proxy.conf
 ```
 
 3.  Create multi streams under the distributedlog uri.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-tutorials/distributedlog-basic/basic-4.md
--
diff --git a/distributedlog-tutorials/distributedlog-basic/basic-4.md 
b/distributedlog-tutorials/distributedlog-basic/basic-4.md
index 684bdeb..e30d628 100644
--- a/distributedlog-tutorials/distributedlog-basic/basic-4.md
+++ b/distributedlog-tutorials/distributedlog-basic/basic-4.md
@@ -92,7 +92,7 @@ Run the example in the following steps:
 2.  Start the write proxy, listening on port 8000.
 ```
 // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} 
-sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
-./distributedlog-service/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
+./distributedlog-proxy-server/bin/dlog 
org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 
-sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c 
${distributedlog-repo}/distributedlog-proxy-server/conf/distributedlog_proxy.conf
 ```
 
 3.  Create the stream under the distributedlog uri.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-tutorials/distributedlog-basic/basic-5.md
--
diff --git a/distributedlog-tutorials/distributedlog-basic/basic-5.md 
b/distributedlog-tutorials/distributedlog-basic/basic-5.md
index 684f228..ba984ee 100644
--- a/distributedlog-tutorials/distributedlog-basic/basic-5.md
+++ b/distributedlog-tutorials/distributedlog-basic/basic-5.md
@@ -90,7 +90,7 @@ Run the example in the following steps:
 2.  Start the write proxy, listening on port 8000.
 ```
 // DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} 
-sp ${stats-port} -u {distributedlog-uri} -mx -c 

[GitHub] incubator-distributedlog pull request #132: DL-204: Bump libthrift to latest...

2017-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-distributedlog/pull/132


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-distributedlog git commit: DL-204: Bump libthrift to latest version for distributedlog-core

2017-06-12 Thread sijie
DL-204: Bump libthrift to latest version for distributedlog-core

Currently finagle heavily depends on an out-of-dated version - libthrift 5.0. 
Proxy modules (client, server) depend on this version, however the core library 
doesn't really depend on libthrift.

This change is to change libthrift to 0.9.* in distributedlog-core and shade it 
to avoid it conflict with the version used by finagle.

This change is based on #131 . The main change is at gitsha 
[6e58786](https://github.com/apache/incubator-distributedlog/commit/6e587869f87cdce50ae93ba3d52767719d1ab5a6)

Author: Sijie Guo 

Reviewers: Jia Zhai , Leigh Stewart 

Closes #132 from sijie/change_thrift_for_core_module


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0cb775c9
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0cb775c9
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0cb775c9

Branch: refs/heads/master
Commit: 0cb775c931c7b92a0dda58bb59b4145ff9fdd255
Parents: 9c6c9c4
Author: Sijie Guo 
Authored: Mon Jun 12 14:12:13 2017 -0700
Committer: Sijie Guo 
Committed: Mon Jun 12 14:12:13 2017 -0700

--
 .gitignore  |3 +
 distributedlog-benchmark/pom.xml|2 +-
 distributedlog-core/pom.xml |   55 +-
 .../thrift/AccessControlEntry.java  |  793 +++
 .../distributedlog/thrift/BKDLConfigFormat.java | 1335 ++
 .../distributedlog/thrift/package-info.java |   21 +
 distributedlog-proxy-server/bin/dlog-env.sh |2 +-
 .../distributedlog-basic/basic-2.md |2 +-
 .../distributedlog-basic/basic-3.md |2 +-
 .../distributedlog-basic/basic-4.md |2 +-
 .../distributedlog-basic/basic-5.md |2 +-
 .../distributedlog-basic/basic-6.md |2 +-
 .../distributedlog-messaging/messaging-1.md |2 +-
 .../distributedlog-messaging/messaging-2.md |2 +-
 docs/admin_guide/bookkeeper.rst |   14 +-
 docs/admin_guide/operations.rst |2 +-
 docs/admin_guide/zookeeper.rst  |   14 +-
 docs/deployment/cluster.rst |   64 +-
 docs/start/download.rst |   35 +
 docs/start/quickstart.rst   |   14 +-
 docs/tutorials/basic-2.rst  |2 +-
 docs/tutorials/basic-3.rst  |2 +-
 docs/tutorials/basic-4.rst  |2 +-
 docs/tutorials/basic-5.rst  |2 +-
 docs/tutorials/basic-6.rst  |2 +-
 docs/tutorials/messaging-1.rst  |2 +-
 docs/tutorials/messaging-2.rst  |2 +-
 pom.xml |1 +
 scripts/dev/repackage.sh|2 +-
 scripts/integration/smoketest.sh|   10 +-
 scripts/snapshot|   16 +-
 vagrant/bk.sh   |   24 +-
 vagrant/zk.sh   |   18 +-
 33 files changed, 2325 insertions(+), 128 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/.gitignore
--
diff --git a/.gitignore b/.gitignore
index 914de41..e895020 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,6 @@ logs/
 
 # vagrant
 .vagrant
+
+# shade plugins
+*dependency-reduced-pom.xml

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-benchmark/pom.xml
--
diff --git a/distributedlog-benchmark/pom.xml b/distributedlog-benchmark/pom.xml
index 26651ad..bd9889c 100644
--- a/distributedlog-benchmark/pom.xml
+++ b/distributedlog-benchmark/pom.xml
@@ -32,7 +32,7 @@
 
 
   org.apache.distributedlog
-  distributedlog-service
+  distributedlog-proxy-server
   ${project.parent.version}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0cb775c9/distributedlog-core/pom.xml
--
diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml
index 9c7afc7..a4f7568 100644
--- a/distributedlog-core/pom.xml
+++ b/distributedlog-core/pom.xml
@@ -87,12 +87,7 @@
 
   org.apache.thrift
   libthrift
-  ${libthrift.version}
-
-
-  com.twitter
-  scrooge-core_2.11
-  ${scrooge.version}
+  0.9.3
 
 
   org.apache.bookkeeper
@@ -150,23 +145,6 @@
   
 
  

Re: [VOTE] Merge DistributedLog as the subproject of Apache BookKeeper

2017-06-12 Thread Dave Fisher
Hi -

I am adding the Pulsar Dev list so that podling can be certainly aware. (I know 
that there are BookKeeper dev involved in Pulsar.

Please drop general@ off for any further discussions from the Pulsar side.

Regards,
Dave

> On Jun 8, 2017, at 5:21 PM, Sijie Guo  wrote:
> 
> ( /cc bookkeeper dev@ and incubator general@ for awareness )
> 
> Hi all,
> 
> There was a joint discussion between BookKeeper PMC and DistributedLog PPMC
> about moving the development of DistributedLog as part of Apache
> BookKeeper. The reasons behind it are:
> 
> First, DistributedLog is born as an extension to BookKeeper, to offer
> continuous log streams as the service. The ledger API in bookkeeper is a
> lower level API and has learning curves, while the log stream API in
> distributedlog is a higher level API that simplifies the usage. The
> combination of ledger API and stream API would offer a better
> developer/user experience for applications.
> 
> Secondly, using ledgers to build continuous (re-openable) log stream is a
> very common pattern for BookKeeper use cases. We did this for HDFS namenode
> journal, for Hedwig, for DistributedLog, and for Pulsar. The same pattern
> has been implemented again and again. Merge DistributedLog (also
> ManagedLedger in Pulsar) with BookKeeper will consolidate all the
> development efforts around this common 'log stream' pattern.
> 
> Thirdly, the 'log' stream abstraction is a good abstraction for both
> messaging and streaming. Internally at BookKeeper, there are a few places
> that can use such 'messaging' facility to improve bookkeeper itself. the
> log stream in DistributedLog can be used internally at bookkeeper for
> streaming changes as well.
> 
> We choose merging DistributedLog as subproject rather than modules. It is a
> softer starting point to avoid disrupting the folks who are depending on
> the ledger api alone. The BookKeeper PMC and DistributedLog PPMC has
> achieved initial consensus on this merge. There is an official VOTE ongoing
> in bookkeeper PMC. We'd like to bring this to the distributedlog community
> for a community vote following the guidelines here
> .
> 
> Please vote +1 if in favor of merging DistributedLog to BookKeeper, and -1
> if not. The vote will be open until Tuesday 13rd June, 18:00 PST.
> 
> - Sijie



signature.asc
Description: Message signed with OpenPGP


incubator-distributedlog git commit: DL-199: Be able to support filesystem-path like name

2017-06-12 Thread sijie
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master c44e0278e -> 9c6c9c452


DL-199: Be able to support filesystem-path like name

In order to support hierarchical namespace, we need to be able to support 
filesystem path like log name.

Author: Sijie Guo 

Reviewers: Jia Zhai , Leigh Stewart 

Closes #130 from sijie/DL_199


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9c6c9c45
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9c6c9c45
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9c6c9c45

Branch: refs/heads/master
Commit: 9c6c9c4520957499e305adf0658dd8711e99491b
Parents: c44e027
Author: Sijie Guo 
Authored: Mon Jun 12 12:55:41 2017 -0700
Committer: Sijie Guo 
Committed: Mon Jun 12 12:55:41 2017 -0700

--
 .../BKDistributedLogNamespace.java  |  10 +-
 .../distributedlog/impl/BKNamespaceDriver.java  |   4 +-
 .../org/apache/distributedlog/util/DLUtils.java | 106 ++-
 .../org/apache/distributedlog/util/Utils.java   |   1 +
 .../TestBKDistributedLogNamespace.java  |  37 ++-
 .../apache/distributedlog/util/TestDLUtils.java |  34 ++
 6 files changed, 152 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9c6c9c45/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
--
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index 0a4608e..adb591f 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
-import static org.apache.distributedlog.util.DLUtils.validateName;
+import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
 
 /**
  * BKDistributedLogNamespace is the default implementation of {@link 
DistributedLogNamespace}. It uses
@@ -148,7 +148,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
 public void createLog(String logName)
 throws InvalidStreamNameException, IOException {
 checkState();
-validateName(logName);
+logName = validateAndNormalizeName(logName);
 URI uri = 
FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
 
FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, 
logName, true, true));
 }
@@ -157,7 +157,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
 public void deleteLog(String logName)
 throws InvalidStreamNameException, LogNotFoundException, 
IOException {
 checkState();
-validateName(logName);
+logName = validateAndNormalizeName(logName);
 Optional uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
 if (!uri.isPresent()) {
 throw new LogNotFoundException("Log " + logName + " isn't found.");
@@ -186,7 +186,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
  Optional 
perStreamStatsLogger)
 throws InvalidStreamNameException, IOException {
 checkState();
-validateName(logName);
+logName = validateAndNormalizeName(logName);
 Optional uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
 if (!uri.isPresent()) {
 throw new LogNotFoundException("Log " + logName + " isn't found.");
@@ -256,7 +256,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
 throws InvalidStreamNameException, IOException {
 // Make sure the name is well formed
 checkState();
-validateName(nameOfLogStream);
+nameOfLogStream = validateAndNormalizeName(nameOfLogStream);
 
 DistributedLogConfiguration mergedConfiguration = new 
DistributedLogConfiguration();
 mergedConfiguration.addConfiguration(conf);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9c6c9c45/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java

[GitHub] incubator-distributedlog pull request #130: DL-199: Be able to support files...

2017-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-distributedlog/pull/130


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-distributedlog pull request #131: DL-205: Remove StatusCode depend...

2017-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-distributedlog/pull/131


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Github repo isn't in-sync with the git repo

2017-06-12 Thread Sijie Guo
It seems that the recent merged changes are not propagated into the github
mirror. it seems something is broken in INFRA. try to asking INFRA team.


Re: [VOTE] Merge DistributedLog as the subproject of Apache BookKeeper

2017-06-12 Thread Franck Cuny
+1 for subproject.

On Mon, Jun 12, 2017 at 7:57 AM Leigh Stewart 
wrote:

> +1
>
> On Fri, Jun 9, 2017 at 2:48 PM, Uma gangumalla 
> wrote:
>
> > +1 (binding)
> >
> > Regards,
> > Uma
> >
> > On Thu, Jun 8, 2017 at 5:21 PM, Sijie Guo  wrote:
> >
> > > ( /cc bookkeeper dev@ and incubator general@ for awareness )
> > >
> > > Hi all,
> > >
> > > There was a joint discussion between BookKeeper PMC and DistributedLog
> > PPMC
> > > about moving the development of DistributedLog as part of Apache
> > > BookKeeper. The reasons behind it are:
> > >
> > > First, DistributedLog is born as an extension to BookKeeper, to offer
> > > continuous log streams as the service. The ledger API in bookkeeper is
> a
> > > lower level API and has learning curves, while the log stream API in
> > > distributedlog is a higher level API that simplifies the usage. The
> > > combination of ledger API and stream API would offer a better
> > > developer/user experience for applications.
> > >
> > > Secondly, using ledgers to build continuous (re-openable) log stream
> is a
> > > very common pattern for BookKeeper use cases. We did this for HDFS
> > namenode
> > > journal, for Hedwig, for DistributedLog, and for Pulsar. The same
> pattern
> > > has been implemented again and again. Merge DistributedLog (also
> > > ManagedLedger in Pulsar) with BookKeeper will consolidate all the
> > > development efforts around this common 'log stream' pattern.
> > >
> > > Thirdly, the 'log' stream abstraction is a good abstraction for both
> > > messaging and streaming. Internally at BookKeeper, there are a few
> places
> > > that can use such 'messaging' facility to improve bookkeeper itself.
> the
> > > log stream in DistributedLog can be used internally at bookkeeper for
> > > streaming changes as well.
> > >
> > > We choose merging DistributedLog as subproject rather than modules. It
> > is a
> > > softer starting point to avoid disrupting the folks who are depending
> on
> > > the ledger api alone. The BookKeeper PMC and DistributedLog PPMC has
> > > achieved initial consensus on this merge. There is an official VOTE
> > ongoing
> > > in bookkeeper PMC. We'd like to bring this to the distributedlog
> > community
> > > for a community vote following the guidelines here
> > > .
> > >
> > > Please vote +1 if in favor of merging DistributedLog to BookKeeper, and
> > -1
> > > if not. The vote will be open until Tuesday 13rd June, 18:00 PST.
> > >
> > > - Sijie
> > >
> >
>
-- 
-franck


[12/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
--
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
new file mode 100644
index 000..58b5b2a
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.client.routing.RegionsRoutingService;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Base test case for distributedlog servers.
+ */
+public abstract class DistributedLogServerTestCase {
+
+protected static DistributedLogConfiguration conf =
+new DistributedLogConfiguration().setLockTimeout(10)
+
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+protected static DistributedLogConfiguration noAdHocConf =
+new 
DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
+
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+protected static DistributedLogCluster dlCluster;
+protected static DistributedLogCluster noAdHocCluster;
+
+/**
+ * A distributedlog client wrapper for testing.
+ */
+protected static class DLClient {
+public final LocalRoutingService routingService;
+public DistributedLogClientBuilder dlClientBuilder;
+public final DistributedLogClientImpl dlClient;
+
+protected DLClient(String name,
+   String streamNameRegex,
+   Optional serverSideRoutingFinagleName) {
+routingService = LocalRoutingService.newBuilder().build();
+dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+.name(name)
+.clientId(ClientId$.MODULE$.apply(name))
+.routingService(routingService)
+.streamNameRegex(streamNameRegex)
+.handshakeWithClientInfo(true)
+.clientBuilder(ClientBuilder.get()
+.hostConnectionLimit(1)
+.connectionTimeout(Duration.fromSeconds(1))
+.requestTimeout(Duration.fromSeconds(60)));
+if (serverSideRoutingFinagleName.isPresent()) {
+dlClientBuilder =
+
dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
+}
+dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+}
+
+public void handshake() {
+dlClient.handshake();
+}
+
+public void shutdown() {
+dlClient.close();
+

[01/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 52c0eef87 -> c44e0278e


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
--
diff --git 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
 
b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
deleted file mode 100644
index 431bfa4..000
--- 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.ChainedRequestLimiter;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
-import 
org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import org.apache.distributedlog.limiter.GuavaRateLimiter;
-import org.apache.distributedlog.limiter.RateLimiter;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ServiceRequestLimiter}.
- */
-public class TestServiceRequestLimiter {
-
-/**
- * Mock Request.
- */
-class MockRequest {
-int size;
-MockRequest() {
-this(1);
-}
-MockRequest(int size) {
-this.size = size;
-}
-int getSize() {
-return size;
-}
-}
-
-/**
- * Mock request limiter.
- */
-class MockRequestLimiter implements RequestLimiter {
-public void apply(MockRequest request) {
-}
-}
-
-/**
- * Counter based limiter.
- */
-static class CounterLimiter implements RateLimiter {
-final int limit;
-int count;
-
-public CounterLimiter(int limit) {
-this.limit = limit;
-this.count = 0;
-}
-
-@Override
-public boolean acquire(int permits) {
-if (++count > limit) {
-return false;
-}
-return true;
-}
-}
-
-/**
- * Mock hard request limiter.
- */
-class MockHardRequestLimiter implements RequestLimiter {
-
-RequestLimiter limiter;
-int limitHitCount;
-
-MockHardRequestLimiter(int limit) {
-this(GuavaRateLimiter.of(limit));
-}
-
-MockHardRequestLimiter(RateLimiter limiter) {
-this.limiter = new ComposableRequestLimiter(
-limiter,
-new OverlimitFunction() {
-public void apply(MockRequest request) throws 
OverCapacityException {
-limitHitCount++;
-throw new OverCapacityException("Limit exceeded");
-}
-},
-new CostFunction() {
-public int apply(MockRequest request) {
-return request.getSize();
-}
-},
-NullStatsLogger.INSTANCE);
-}
-
-@Override
-public void apply(MockRequest op) throws OverCapacityException {
-limiter.apply(op);
-}
-
-public int getLimitHitCount() {
-return limitHitCount;
-}
-}
-
-/**
- * Mock soft 

[05/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
deleted file mode 100644
index bf7a1ad..000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ /dev/null
@@ -1,925 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.StreamNotReadyException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.io.Abortables;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.FatalErrorHandler;
-import org.apache.distributedlog.service.ServerFeatureKeys;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.TimeSequencer;
-import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.TimeoutException;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * Implementation of {@link Stream}.
- */
-public class StreamImpl implements Stream {
-
-private static final Logger logger = 
LoggerFactory.getLogger(StreamImpl.class);
-
-/**
- * The status of the stream.
- *
- * The status change of the stream should just go in one direction. If 
a stream hits
- * any error, the stream should be put in error state. If a stream is in 
error state,
- * it should be removed and not reused anymore.
- */
-public enum StreamStatus {
-UNINITIALIZED(-1),
-INITIALIZING(0),
-INITIALIZED(1),
-CLOSING(-4),
-CLOSED(-5),
-// if a stream is in error state, it should be abort during closing.
-ERROR(-6);
-
-final int code;
-
-StreamStatus(int 

[26/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
--
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
deleted file mode 100644
index 0e2a152..000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.proxy.ClusterClient;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingUtils;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Random;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Builder to build {@link DistributedLogClient}.
- */
-public final class DistributedLogClientBuilder {
-
-private static final Logger logger = 
LoggerFactory.getLogger(DistributedLogClientBuilder.class);
-
-private static final Random random = new 
Random(System.currentTimeMillis());
-
-private String name = null;
-private ClientId clientId = null;
-private RoutingService.Builder routingServiceBuilder = null;
-private ClientBuilder clientBuilder = null;
-private String serverRoutingServiceFinagleName = null;
-private StatsReceiver statsReceiver = new NullStatsReceiver();
-private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
-private ClientConfig clientConfig = new ClientConfig();
-private boolean enableRegionStats = false;
-private final RegionResolver regionResolver = new DefaultRegionResolver();
-
-/**
- * Create a client builder.
- *
- * @return client builder
- */
-public static DistributedLogClientBuilder newBuilder() {
-return new DistributedLogClientBuilder();
-}
-
-/**
- * Create a new client builder from an existing {@code builder}.
- *
- * @param builder the existing builder.
- * @return a new client builder.
- */
-public static DistributedLogClientBuilder 
newBuilder(DistributedLogClientBuilder builder) {
-DistributedLogClientBuilder newBuilder = new 
DistributedLogClientBuilder();
-newBuilder.name = builder.name;
-newBuilder.clientId = builder.clientId;
-newBuilder.clientBuilder = builder.clientBuilder;
-newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
-newBuilder.statsReceiver = builder.statsReceiver;
-newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
-

[07/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
deleted file mode 100644
index 3c53ccf..000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DistributedLogClient;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A balancer balances ownerships between two targets.
- */
-public class SimpleBalancer implements Balancer {
-
-private static final Logger logger = 
LoggerFactory.getLogger(SimpleBalancer.class);
-
-protected final String target1;
-protected final String target2;
-protected final DistributedLogClient targetClient1;
-protected final DistributedLogClient targetClient2;
-protected final MonitorServiceClient targetMonitor1;
-protected final MonitorServiceClient targetMonitor2;
-
-public SimpleBalancer(String name1,
-  DistributedLogClient client1,
-  MonitorServiceClient monitor1,
-  String name2,
-  DistributedLogClient client2,
-  MonitorServiceClient monitor2) {
-this.target1 = name1;
-this.targetClient1 = client1;
-this.targetMonitor1 = monitor1;
-this.target2 = name2;
-this.targetClient2 = client2;
-this.targetMonitor2 = monitor2;
-}
-
-protected static int countNumberStreams(Map 
distribution) {
-int count = 0;
-for (Set streams : distribution.values()) {
-count += streams.size();
-}
-return count;
-}
-
-@Override
-public void balance(int rebalanceWaterMark,
-double rebalanceTolerancePercentage,
-int rebalanceConcurrency,
-Optional rebalanceRateLimiter) {
-// get the ownership distributions from individual targets
-Map distribution1 = 
targetMonitor1.getStreamOwnershipDistribution();
-Map distribution2 = 
targetMonitor2.getStreamOwnershipDistribution();
-
-// get stream counts
-int proxyCount1 = distribution1.size();
-int streamCount1 = countNumberStreams(distribution1);
-int proxyCount2 = distribution2.size();
-int streamCount2 = countNumberStreams(distribution2);
-
-logger.info("'{}' has {} streams by {} proxies; while '{}' has {} 
streams by {} proxies.",
-new Object[] {target1, streamCount1, proxyCount1, target2, 
streamCount2, proxyCount2 });
-
-String source, target;
-Map srcDistribution;
-DistributedLogClient srcClient, targetClient;
-MonitorServiceClient srcMonitor, targetMonitor;
-int srcStreamCount, targetStreamCount;
-if (streamCount1 > streamCount2) {
-source = target1;
-srcStreamCount = streamCount1;
-srcClient = targetClient1;
-srcMonitor = targetMonitor1;
-srcDistribution = distribution1;
-
-target = target2;
-targetStreamCount = streamCount2;
-targetClient = targetClient2;
-targetMonitor = targetMonitor2;
- 

[27/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
--
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
deleted file mode 100644
index 4fe8141..000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Routing Service based on a given {@link 
com.twitter.common.zookeeper.ServerSet}.
- */
-class ServerSetRoutingService extends Thread implements RoutingService {
-
-private static final Logger logger = 
LoggerFactory.getLogger(ServerSetRoutingService.class);
-
-static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
-return new ServerSetRoutingServiceBuilder();
-}
-
-/**
- * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based 
routing service.
- */
-static class ServerSetRoutingServiceBuilder implements 
RoutingService.Builder {
-
-private ServerSetWatcher serverSetWatcher;
-
-private ServerSetRoutingServiceBuilder() {}
-
-public ServerSetRoutingServiceBuilder 
serverSetWatcher(ServerSetWatcher serverSetWatcher) {
-this.serverSetWatcher = serverSetWatcher;
-return this;
-}
-
-@Override
-public Builder statsReceiver(StatsReceiver statsReceiver) {
-return this;
-}
-
-@Override
-public RoutingService build() {
-checkNotNull(serverSetWatcher, "No serverset watcher provided.");
-return new ServerSetRoutingService(this.serverSetWatcher);
-}
-}
-
-private static class HostComparator implements Comparator {
-
-private static final HostComparator INSTANCE = new HostComparator();
-
-@Override
-public int compare(SocketAddress o1, SocketAddress o2) {
-return o1.toString().compareTo(o2.toString());
-}
-}
-
-private final ServerSetWatcher serverSetWatcher;
-
-private final Set hostSet = new HashSet();
-private List hostList = new ArrayList();
-private final HashFunction hasher = Hashing.md5();
-
-// Server Set Changes
-private final AtomicReference 
serverSetChange =
-new AtomicReference(null);
-private final CountDownLatch changeLatch = new CountDownLatch(1);
-
-// Listeners
-protected final CopyOnWriteArraySet listeners =
-new CopyOnWriteArraySet();
-
-ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
-super("ServerSetRoutingService");
-this.serverSetWatcher = serverSetWatcher;
-}
-
-@Override
-public Set getHosts() {
-synchronized (hostSet) {
-return ImmutableSet.copyOf(hostSet);
-}
-}
-
-@Override
-public void startService() {
-start();
-try {
-if (!changeLatch.await(1, TimeUnit.MINUTES)) 

[02/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
--
diff --git 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
 
b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
deleted file mode 100644
index d0a2f88..000
--- 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.feature.DefaultFeatureProvider;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link 
org.apache.distributedlog.exceptions.RegionUnavailableException}.
- */
-public class TestRegionUnavailable extends DistributedLogServerTestCase {
-
-/**
- * A feature provider for testing.
- */
-public static class TestFeatureProvider extends DefaultFeatureProvider {
-
-public TestFeatureProvider(String rootScope,
-   DistributedLogConfiguration conf,
-   StatsLogger statsLogger) {
-super(rootScope, conf, statsLogger);
-}
-
-@Override
-protected Feature makeFeature(String featureName) {
-if 
(featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase()))
 {
-return new SettableFeature(featureName, 1);
-}
-return super.makeFeature(featureName);
-}
-
-@Override
-protected FeatureProvider makeProvider(String fullScopeName) {
-return super.makeProvider(fullScopeName);
-}
-}
-
-private final int numServersPerDC = 3;
-private final List localCluster;
-private final List remoteCluster;
-private TwoRegionDLClient client;
-
-public TestRegionUnavailable() {
-super(true);
-this.localCluster = new ArrayList();
-this.remoteCluster = new ArrayList();
-}
-
-@Before
-@Override
-public void setup() throws Exception {
-DistributedLogConfiguration localConf = new 
DistributedLogConfiguration();
-localConf.addConfiguration(conf);
-localConf.setFeatureProviderClass(TestFeatureProvider.class);
-DistributedLogConfiguration remoteConf = new 
DistributedLogConfiguration();
-remoteConf.addConfiguration(conf);
-super.setup();
-int localPort = 9010;
-int remotePort = 9020;
-for (int i = 0; i < numServersPerDC; i++) {
-localCluster.add(createDistributedLogServer(localConf, localPort + 
i));
-remoteCluster.add(createDistributedLogServer(remoteConf, 
remotePort + i));
-}
-Map regionMap = new HashMap();
-for (DLServer server : localCluster) {
-regionMap.put(server.getAddress(), "local");
-}
-for (DLServer server : remoteCluster) {
-regionMap.put(server.getAddress(), "remote");
-}
-client = createTwoRegionDLClient("two_regions_client", regionMap);
-
-}
-
-private void registerStream(String streamName) {
-for (DLServer server : localCluster) {
-client.localRoutingService.addHost(streamName, 
server.getAddress());
-}
-client.remoteRoutingService.addHost(streamName, 
remoteCluster.get(0).getAddress());
-}
-
- 

[03/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
--
diff --git 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
 
b/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
deleted file mode 100644
index 58b5b2a..000
--- 
a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.DLMTestUtil;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-/**
- * Base test case for distributedlog servers.
- */
-public abstract class DistributedLogServerTestCase {
-
-protected static DistributedLogConfiguration conf =
-new DistributedLogConfiguration().setLockTimeout(10)
-
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
-protected static DistributedLogConfiguration noAdHocConf =
-new 
DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
-
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
-protected static DistributedLogCluster dlCluster;
-protected static DistributedLogCluster noAdHocCluster;
-
-/**
- * A distributedlog client wrapper for testing.
- */
-protected static class DLClient {
-public final LocalRoutingService routingService;
-public DistributedLogClientBuilder dlClientBuilder;
-public final DistributedLogClientImpl dlClient;
-
-protected DLClient(String name,
-   String streamNameRegex,
-   Optional serverSideRoutingFinagleName) {
-routingService = LocalRoutingService.newBuilder().build();
-dlClientBuilder = DistributedLogClientBuilder.newBuilder()
-.name(name)
-.clientId(ClientId$.MODULE$.apply(name))
-.routingService(routingService)
-.streamNameRegex(streamNameRegex)
-.handshakeWithClientInfo(true)
-.clientBuilder(ClientBuilder.get()
-.hostConnectionLimit(1)
-.connectionTimeout(Duration.fromSeconds(1))
-.requestTimeout(Duration.fromSeconds(60)));
-if (serverSideRoutingFinagleName.isPresent()) {
-dlClientBuilder =
-
dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
-}
-dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
-}
-
-public void handshake() {
-dlClient.handshake();
-}
-
-public void shutdown() {
-dlClient.close();
-}
-}
-
-

[06/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
deleted file mode 100644
index 862f05a..000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.util.Utils;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the PlacementStateManager that saves data to and loads 
from Zookeeper to
- * avoid necessitating an additional system for the resource placement.
- */
-public class ZKPlacementStateManager implements PlacementStateManager {
-
-private static final Logger logger = 
LoggerFactory.getLogger(ZKPlacementStateManager.class);
-
-private static final String SERVER_LOAD_DIR = "/.server-load";
-
-private final String serverLoadPath;
-private final ZooKeeperClient zkClient;
-
-private boolean watching = false;
-
-public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, 
StatsLogger statsLogger) {
-String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
-zkClient = BKNamespaceDriver.createZKClientBuilder(
-String.format("ZKPlacementStateManager-%s", zkServers),
-conf,
-zkServers,
-statsLogger.scope("placement_state_manager")).build();
-serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
-}
-
-private void createServerLoadPathIfNoExists(byte[] data)
-throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-try {
-Utils.zkCreateFullPathOptimistic(
-zkClient, serverLoadPath, data, zkClient.getDefaultACL(), 
CreateMode.PERSISTENT);
-} catch (KeeperException.NodeExistsException nee) {
-logger.debug("the server load path {} is already created by 
others", serverLoadPath, nee);
-}
-}
-
-@Override
-public void saveOwnership(TreeSet serverLoads) throws 
StateManagerSaveException {
-logger.info("saving ownership");
-try {
-ZooKeeper zk = zkClient.get();
-// use timestamp as data so watchers will see any changes
-byte[] timestamp = 
ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
-if (zk.exists(serverLoadPath, false) == null) { //create path to 
rootnode if it does not yet exist
-createServerLoadPathIfNoExists(timestamp);
-}
-
-Transaction tx = zk.transaction();
-List children = zk.getChildren(serverLoadPath, false);
-HashSet servers = new HashSet(children);
-tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher 
that data has been updated
-for (ServerLoad serverLoad : serverLoads) {
-String server = serverToZkFormat(serverLoad.getServer());
-String serverPath = serverPath(server);
-if (servers.contains(server)) {
-  

[17/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
--
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
new file mode 100644
index 000..b1e2879
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import com.twitter.util.FutureEventListener;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitor Service.
+ */
+public class MonitorService implements NamespaceListener {
+
+private static final Logger logger = 
LoggerFactory.getLogger(MonitorService.class);
+
+private DistributedLogNamespace dlNamespace = null;
+private MonitorServiceClient dlClient = null;
+private DLZkServerSet[] zkServerSets = null;
+private final ScheduledExecutorService executorService =
+
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
+private final Map knownStreams = new 
HashMap();
+
+// Settings
+private int regionId = DistributedLogConstants.LOCAL_REGION_ID;
+private int interval = 100;
+private String streamRegex = null;
+private boolean watchNamespaceChanges = false;
+private boolean handshakeWithClientInfo = false;
+private int heartbeatEveryChecks = 0;
+private int instanceId = -1;
+private int totalInstances = -1;
+private boolean isThriftMux = false;
+
+// Options
+private final Optional uriArg;
+private final Optional confFileArg;
+private final Optional serverSetArg;
+private final Optional intervalArg;
+private final Optional regionIdArg;
+private final Optional streamRegexArg;
+private final Optional instanceIdArg;
+private final Optional totalInstancesArg;
+private final 

[28/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
--
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
deleted file mode 100644
index 6ef1d8e..000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import scala.Option;
-import scala.runtime.BoxedUnit;
-
-/**
- * Client talks to a single proxy.
- */
-public class ProxyClient {
-
-  /**
-   * Builder to build a proxy client talking to given host 
address.
-   */
-  public interface Builder {
-/**
- * Build a proxy client to address.
- *
- * @param address
- *  proxy address
- * @return proxy client
- */
-ProxyClient build(SocketAddress address);
-}
-
-public static Builder newBuilder(String clientName,
- ClientId clientId,
- ClientBuilder clientBuilder,
- ClientConfig clientConfig,
- ClientStats clientStats) {
-return new DefaultBuilder(clientName, clientId, clientBuilder, 
clientConfig, clientStats);
-}
-
-/**
- * Default Builder for {@link ProxyClient}.
- */
-public static class DefaultBuilder implements Builder {
-
-private final String clientName;
-private final ClientId clientId;
-private final ClientBuilder clientBuilder;
-private final ClientStats clientStats;
-
-private DefaultBuilder(String clientName,
-   ClientId clientId,
-   ClientBuilder clientBuilder,
-   ClientConfig clientConfig,
-   ClientStats clientStats) {
-this.clientName = clientName;
-this.clientId = clientId;
-this.clientStats = clientStats;
-// client builder
-ClientBuilder builder = setDefaultSettings(
-null == clientBuilder ? 
getDefaultClientBuilder(clientConfig) : clientBuilder);
-this.clientBuilder = configureThriftMux(builder, clientId, 
clientConfig);
-}
-
-@SuppressWarnings("unchecked")
-private ClientBuilder configureThriftMux(ClientBuilder builder,
- ClientId clientId,
- ClientConfig clientConfig) {
-if (clientConfig.getThriftMux()) {
-return 
builder.stack(ThriftMux.client().withClientId(clientId));
-} else {
-return 
builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
-}
-}
-
-private ClientBuilder getDefaultClientBuilder(ClientConfig 
clientConfig) {
-ClientBuilder builder = ClientBuilder.get()
-.tcpConnectTimeout(Duration.fromMilliseconds(200))
-.connectTimeout(Duration.fromMilliseconds(200))
-.requestTimeout(Duration.fromSeconds(1));
-if (!clientConfig.getThriftMux()) 

[20/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
--
diff --git 
a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
 
b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
new file mode 100644
index 000..86d1c11
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.distributedlog.client.ClientConfig;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for Ownership Cache.
+ */
+public class TestOwnershipCache {
+
+@Rule
+public TestName runtime = new TestName();
+
+private static OwnershipCache createOwnershipCache() {
+ClientConfig clientConfig = new ClientConfig();
+return new OwnershipCache(clientConfig, null,
+  NullStatsReceiver.get(), 
NullStatsReceiver.get());
+}
+
+private static SocketAddress createSocketAddress(int port) {
+return new InetSocketAddress("127.0.0.1", port);
+}
+
+@Test(timeout = 6)
+public void testUpdateOwner() {
+OwnershipCache cache = createOwnershipCache();
+SocketAddress addr = createSocketAddress(1000);
+String stream = runtime.getMethodName();
+
+assertTrue("Should successfully update owner if no owner exists 
before",
+cache.updateOwner(stream, addr));
+assertEquals("Owner should be " + addr + " for stream " + stream,
+addr, cache.getOwner(stream));
+assertTrue("Should successfully update owner if old owner is same",
+cache.updateOwner(stream, addr));
+assertEquals("Owner should be " + addr + " for stream " + stream,
+addr, cache.getOwner(stream));
+}
+
+@Test(timeout = 6)
+public void testRemoveOwnerFromStream() {
+OwnershipCache cache = createOwnershipCache();
+int initialPort = 2000;
+int numProxies = 2;
+int numStreamsPerProxy = 2;
+for (int i = 0; i < numProxies; i++) {
+SocketAddress addr = createSocketAddress(initialPort + i);
+for (int j = 0; j < numStreamsPerProxy; j++) {
+String stream = runtime.getMethodName() + "_" + i + "_" + j;
+cache.updateOwner(stream, addr);
+}
+}
+Map ownershipMap = 
cache.getStreamOwnerMapping();
+assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
+numProxies * numStreamsPerProxy, ownershipMap.size());
+Map ownershipDistribution = 
cache.getStreamOwnershipDistribution();
+assertEquals("There should be " + numProxies + " proxies cached",
+numProxies, ownershipDistribution.size());
+
+String stream = runtime.getMethodName() + "_0_0";
+SocketAddress owner = createSocketAddress(initialPort);
+
+// remove non-existent mapping won't change anything
+SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+cache.removeOwnerFromStream(stream, nonExistentAddr, 
"remove-non-existent-addr");
+assertEquals("Owner " + owner + " should not be removed",
+owner, cache.getOwner(stream));
+ownershipMap = cache.getStreamOwnerMapping();
+assertEquals("There should be " + (numProxies * 

[10/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
--
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
new file mode 100644
index 000..431bfa4
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ChainedRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
+import 
org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.GuavaRateLimiter;
+import org.apache.distributedlog.limiter.RateLimiter;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ServiceRequestLimiter}.
+ */
+public class TestServiceRequestLimiter {
+
+/**
+ * Mock Request.
+ */
+class MockRequest {
+int size;
+MockRequest() {
+this(1);
+}
+MockRequest(int size) {
+this.size = size;
+}
+int getSize() {
+return size;
+}
+}
+
+/**
+ * Mock request limiter.
+ */
+class MockRequestLimiter implements RequestLimiter {
+public void apply(MockRequest request) {
+}
+}
+
+/**
+ * Counter based limiter.
+ */
+static class CounterLimiter implements RateLimiter {
+final int limit;
+int count;
+
+public CounterLimiter(int limit) {
+this.limit = limit;
+this.count = 0;
+}
+
+@Override
+public boolean acquire(int permits) {
+if (++count > limit) {
+return false;
+}
+return true;
+}
+}
+
+/**
+ * Mock hard request limiter.
+ */
+class MockHardRequestLimiter implements RequestLimiter {
+
+RequestLimiter limiter;
+int limitHitCount;
+
+MockHardRequestLimiter(int limit) {
+this(GuavaRateLimiter.of(limit));
+}
+
+MockHardRequestLimiter(RateLimiter limiter) {
+this.limiter = new ComposableRequestLimiter(
+limiter,
+new OverlimitFunction() {
+public void apply(MockRequest request) throws 
OverCapacityException {
+limitHitCount++;
+throw new OverCapacityException("Limit exceeded");
+}
+},
+new CostFunction() {
+public int apply(MockRequest request) {
+return request.getSize();
+}
+},
+NullStatsLogger.INSTANCE);
+}
+
+@Override
+public void apply(MockRequest op) throws OverCapacityException {
+limiter.apply(op);
+}
+
+public int getLimitHitCount() {
+return limitHitCount;
+}
+}
+
+/**
+ * Mock soft request limiter.
+ */
+class MockSoftRequestLimiter implements RequestLimiter 

[21/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
--
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 000..f1da33c
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link SpeculativeRequestExecutionPolicy}.
+ */
+public class DefaultSpeculativeRequestExecutionPolicy implements 
SpeculativeRequestExecutionPolicy {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class);
+final int firstSpeculativeRequestTimeout;
+final int maxSpeculativeRequestTimeout;
+final float backoffMultiplier;
+int nextSpeculativeRequestTimeout;
+
+public DefaultSpeculativeRequestExecutionPolicy(int 
firstSpeculativeRequestTimeout,
+int 
maxSpeculativeRequestTimeout,
+float backoffMultiplier) {
+this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout;
+this.backoffMultiplier = backoffMultiplier;
+this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+
+if (backoffMultiplier <= 0) {
+throw new IllegalArgumentException("Invalid value provided for 
backoffMultiplier");
+}
+
+// Prevent potential over flow
+if (Math.round((double) maxSpeculativeRequestTimeout * (double) 
backoffMultiplier) > Integer.MAX_VALUE) {
+throw new IllegalArgumentException("Invalid values for 
maxSpeculativeRequestTimeout and backoffMultiplier");
+}
+}
+
+@VisibleForTesting
+int getNextSpeculativeRequestTimeout() {
+return nextSpeculativeRequestTimeout;
+}
+
+/**
+ * Initialize the speculative request execution policy.
+ *
+ * @param scheduler The scheduler service to issue the speculative request
+ * @param requestExecutor The executor is used to issue the actual 
speculative requests
+ */
+@Override
+public void initiateSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+   final SpeculativeRequestExecutor 
requestExecutor) {
+issueSpeculativeRequest(scheduler, requestExecutor);
+}
+
+private void issueSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+ final SpeculativeRequestExecutor 
requestExecutor) {
+Future issueNextRequest = 
requestExecutor.issueSpeculativeRequest();
+issueNextRequest.addEventListener(new FutureEventListener() {
+// we want this handler to run immediately after we push the big 
red button!
+@Override
+public void onSuccess(Boolean issueNextRequest) {
+if (issueNextRequest) {
+scheduleSpeculativeRequest(scheduler, requestExecutor, 
nextSpeculativeRequestTimeout);
+nextSpeculativeRequestTimeout = 
Math.min(maxSpeculativeRequestTimeout,
+(int) (nextSpeculativeRequestTimeout * 
backoffMultiplier));
+} else {

[16/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
--
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
new file mode 100644
index 000..3c53ccf
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.DistributedLogClient;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A balancer balances ownerships between two targets.
+ */
+public class SimpleBalancer implements Balancer {
+
+private static final Logger logger = 
LoggerFactory.getLogger(SimpleBalancer.class);
+
+protected final String target1;
+protected final String target2;
+protected final DistributedLogClient targetClient1;
+protected final DistributedLogClient targetClient2;
+protected final MonitorServiceClient targetMonitor1;
+protected final MonitorServiceClient targetMonitor2;
+
+public SimpleBalancer(String name1,
+  DistributedLogClient client1,
+  MonitorServiceClient monitor1,
+  String name2,
+  DistributedLogClient client2,
+  MonitorServiceClient monitor2) {
+this.target1 = name1;
+this.targetClient1 = client1;
+this.targetMonitor1 = monitor1;
+this.target2 = name2;
+this.targetClient2 = client2;
+this.targetMonitor2 = monitor2;
+}
+
+protected static int countNumberStreams(Map 
distribution) {
+int count = 0;
+for (Set streams : distribution.values()) {
+count += streams.size();
+}
+return count;
+}
+
+@Override
+public void balance(int rebalanceWaterMark,
+double rebalanceTolerancePercentage,
+int rebalanceConcurrency,
+Optional rebalanceRateLimiter) {
+// get the ownership distributions from individual targets
+Map distribution1 = 
targetMonitor1.getStreamOwnershipDistribution();
+Map distribution2 = 
targetMonitor2.getStreamOwnershipDistribution();
+
+// get stream counts
+int proxyCount1 = distribution1.size();
+int streamCount1 = countNumberStreams(distribution1);
+int proxyCount2 = distribution2.size();
+int streamCount2 = countNumberStreams(distribution2);
+
+logger.info("'{}' has {} streams by {} proxies; while '{}' has {} 
streams by {} proxies.",
+new Object[] {target1, streamCount1, proxyCount1, target2, 
streamCount2, proxyCount2 });
+
+String source, target;
+Map srcDistribution;
+DistributedLogClient srcClient, targetClient;
+MonitorServiceClient srcMonitor, targetMonitor;
+int srcStreamCount, targetStreamCount;
+if (streamCount1 > streamCount2) {
+source = target1;
+srcStreamCount = streamCount1;
+srcClient = targetClient1;
+srcMonitor = targetMonitor1;
+srcDistribution = distribution1;
+
+target = target2;
+targetStreamCount = streamCount2;
+targetClient = targetClient2;
+targetMonitor = 

[13/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
--
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 000..c3c5d81
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.exceptions.ChecksumFailedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * Stream admin op.
+ */
+public abstract class StreamAdminOp implements AdminOp {
+
+protected final String stream;
+protected final StreamManager streamManager;
+protected final OpStatsLogger opStatsLogger;
+protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+protected final Long checksum;
+protected final Feature checksumDisabledFeature;
+
+protected StreamAdminOp(String stream,
+StreamManager streamManager,
+OpStatsLogger statsLogger,
+Long checksum,
+Feature checksumDisabledFeature) {
+this.stream = stream;
+this.streamManager = streamManager;
+this.opStatsLogger = statsLogger;
+// start here in case the operation is failed before executing.
+stopwatch.reset().start();
+this.checksum = checksum;
+this.checksumDisabledFeature = checksumDisabledFeature;
+}
+
+protected Long computeChecksum() {
+return ProtocolUtils.streamOpCRC32(stream);
+}
+
+@Override
+public void preExecute() throws DLException {
+if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+Long serverChecksum = computeChecksum();
+if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+throw new ChecksumFailedException();
+}
+}
+}
+
+/**
+ * Execute the operation.
+ *
+ * @return execute operation
+ */
+protected abstract Future executeOp();
+
+@Override
+public Future execute() {
+return executeOp().transformedBy(new FutureTransformer() {
+
+@Override
+public WriteResponse map(WriteResponse response) {
+opStatsLogger.registerSuccessfulEvent(
+stopwatch.elapsed(TimeUnit.MICROSECONDS));
+return response;
+}
+
+@Override
+public WriteResponse handle(Throwable cause) {
+opStatsLogger.registerFailedEvent(
+stopwatch.elapsed(TimeUnit.MICROSECONDS));
+return 
ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+}
+
+});
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
--
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
 

[22/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
--
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
new file mode 100644
index 000..666fa31
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.ChannelException;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.Gauge;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.util.Function0;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+/**
+ * Consistent Hashing Based {@link RoutingService}.
+ */
+public class ConsistentHashRoutingService extends ServerSetRoutingService {
+
+private static final Logger logger = 
LoggerFactory.getLogger(ConsistentHashRoutingService.class);
+
+@Deprecated
+public static ConsistentHashRoutingService of(ServerSetWatcher 
serverSetWatcher, int numReplicas) {
+return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 
300, NullStatsReceiver.get());
+}
+
+/**
+ * Builder helper class to build a consistent hash bashed {@link 
RoutingService}.
+ *
+ * @return builder to build a consistent hash based {@link RoutingService}.
+ */
+public static Builder newBuilder() {
+return new Builder();
+}
+
+/**
+ * Builder for building consistent hash based routing service.
+ */
+public static class Builder implements RoutingService.Builder {
+
+private ServerSet serverSet;
+private boolean resolveFromName = false;
+private int numReplicas;
+private int blackoutSeconds = 300;
+private StatsReceiver statsReceiver = NullStatsReceiver.get();
+
+private Builder() {}
+
+public Builder serverSet(ServerSet serverSet) {
+this.serverSet = serverSet;
+return this;
+}
+
+public Builder resolveFromName(boolean enabled) {
+this.resolveFromName = enabled;
+return this;
+}
+
+public Builder numReplicas(int numReplicas) {
+this.numReplicas = numReplicas;
+return this;
+}
+
+public Builder blackoutSeconds(int seconds) {
+this.blackoutSeconds = seconds;
+return this;
+}
+
+public Builder statsReceiver(StatsReceiver statsReceiver) {
+this.statsReceiver = statsReceiver;
+return this;
+}
+
+   

[19/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-protocol/pom.xml
--
diff --git a/distributedlog-proxy-protocol/pom.xml 
b/distributedlog-proxy-protocol/pom.xml
new file mode 100644
index 000..0f6a85f
--- /dev/null
+++ b/distributedlog-proxy-protocol/pom.xml
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+  4.0.0
+  
+org.apache.distributedlog
+distributedlog
+0.5.0-incubating-SNAPSHOT
+  
+  distributedlog-proxy-protocol
+  Apache DistributedLog :: Proxy Protocol
+  
+
+  org.apache.distributedlog
+  distributedlog-protocol
+  ${project.version}
+
+
+  org.apache.thrift
+  libthrift
+  ${libthrift.version}
+
+
+  com.twitter
+  scrooge-core_2.11
+  ${scrooge.version}
+
+
+  com.twitter
+  finagle-core_2.11
+  ${finagle.version}
+
+
+  com.twitter
+  finagle-thrift_2.11
+  ${finagle.version}
+
+  
+  
+
+  
+com.twitter
+scrooge-maven-plugin
+${scrooge-maven-plugin.version}
+
+  java
+  
+--finagle
+  
+
+
+  
+thrift-sources
+generate-sources
+
+  compile
+
+  
+
+  
+  
+org.apache.maven.plugins
+maven-jar-plugin
+${maven-jar-plugin.version}
+
+  
+
+  test-jar
+
+  
+
+  
+  
+org.codehaus.mojo
+findbugs-maven-plugin
+
+  
${basedir}/src/main/resources/findbugsExclude.xml
+
+  
+  
+org.apache.maven.plugins
+maven-checkstyle-plugin
+${maven-checkstyle-plugin.version}
+
+  
+com.puppycrawl.tools
+checkstyle
+${puppycrawl.checkstyle.version}
+  
+  
+org.apache.distributedlog
+distributedlog-build-tools
+${project.version}
+  
+
+
+  distributedlog/checkstyle.xml
+  
distributedlog/suppressions.xml
+  true
+  true
+  false
+  true
+
+
+  
+test-compile
+
+  check
+
+  
+
+  
+
+  
+

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
--
diff --git 
a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
 
b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
new file mode 100644
index 000..1f91968
--- /dev/null
+++ 
b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.protocol.util;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.distributedlog.DLSN;
+import java.util.zip.CRC32;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+
+/**
+ * With CRC embedded in the application, we have to keep track of per api crc. 
Ideally this
+ * would be done by thrift.
+ */
+public class ProtocolUtils {
+
+// For request payload checksum
+private static final ThreadLocal requestCRC = new 
ThreadLocal() {
+@Override
+protected CRC32 initialValue() {
+return new CRC32();
+}
+};
+
+/**
+ * Generate crc32 for WriteOp.
+ */
+public static Long writeOpCRC32(String 

[29/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
--
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
deleted file mode 100644
index 1300187..000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
+++ /dev/null
@@ -1,1200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecordSetBuffer;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.ownership.OwnershipCache;
-import org.apache.distributedlog.client.proxy.ClusterClient;
-import org.apache.distributedlog.client.proxy.HostProvider;
-import org.apache.distributedlog.client.proxy.ProxyClient;
-import org.apache.distributedlog.client.proxy.ProxyClientManager;
-import org.apache.distributedlog.client.proxy.ProxyListener;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingService.RoutingContext;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.client.stats.OpStats;
-import org.apache.distributedlog.exceptions.DLClientClosedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.ServiceUnavailableException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import org.apache.distributedlog.thrift.service.ServerStatus;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.finagle.CancelledRequestException;
-import com.twitter.finagle.ConnectionFailedException;
-import com.twitter.finagle.Failure;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.RequestTimeoutException;
-import com.twitter.finagle.ServiceException;
-import com.twitter.finagle.ServiceTimeoutException;
-import com.twitter.finagle.WriteException;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import 

[04/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
deleted file mode 100644
index 3f28c42..000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.admin;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.exceptions.ChecksumFailedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureTransformer;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-
-/**
- * Stream admin op.
- */
-public abstract class StreamAdminOp implements AdminOp {
-
-protected final String stream;
-protected final StreamManager streamManager;
-protected final OpStatsLogger opStatsLogger;
-protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
-protected final Long checksum;
-protected final Feature checksumDisabledFeature;
-
-protected StreamAdminOp(String stream,
-StreamManager streamManager,
-OpStatsLogger statsLogger,
-Long checksum,
-Feature checksumDisabledFeature) {
-this.stream = stream;
-this.streamManager = streamManager;
-this.opStatsLogger = statsLogger;
-// start here in case the operation is failed before executing.
-stopwatch.reset().start();
-this.checksum = checksum;
-this.checksumDisabledFeature = checksumDisabledFeature;
-}
-
-protected Long computeChecksum() {
-return ProtocolUtils.streamOpCRC32(stream);
-}
-
-@Override
-public void preExecute() throws DLException {
-if (!checksumDisabledFeature.isAvailable() && null != checksum) {
-Long serverChecksum = computeChecksum();
-if (null != serverChecksum && !checksum.equals(serverChecksum)) {
-throw new ChecksumFailedException();
-}
-}
-}
-
-/**
- * Execute the operation.
- *
- * @return execute operation
- */
-protected abstract Future executeOp();
-
-@Override
-public Future execute() {
-return executeOp().transformedBy(new FutureTransformer() {
-
-@Override
-public WriteResponse map(WriteResponse response) {
-opStatsLogger.registerSuccessfulEvent(
-stopwatch.elapsed(TimeUnit.MICROSECONDS));
-return response;
-}
-
-@Override
-public WriteResponse handle(Throwable cause) {
-opStatsLogger.registerFailedEvent(
-stopwatch.elapsed(TimeUnit.MICROSECONDS));
-return 
ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
-}
-
-});
-}
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
deleted file 

[11/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
--
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
new file mode 100644
index 000..d0a2f88
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.feature.DefaultFeatureProvider;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link 
org.apache.distributedlog.exceptions.RegionUnavailableException}.
+ */
+public class TestRegionUnavailable extends DistributedLogServerTestCase {
+
+/**
+ * A feature provider for testing.
+ */
+public static class TestFeatureProvider extends DefaultFeatureProvider {
+
+public TestFeatureProvider(String rootScope,
+   DistributedLogConfiguration conf,
+   StatsLogger statsLogger) {
+super(rootScope, conf, statsLogger);
+}
+
+@Override
+protected Feature makeFeature(String featureName) {
+if 
(featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase()))
 {
+return new SettableFeature(featureName, 1);
+}
+return super.makeFeature(featureName);
+}
+
+@Override
+protected FeatureProvider makeProvider(String fullScopeName) {
+return super.makeProvider(fullScopeName);
+}
+}
+
+private final int numServersPerDC = 3;
+private final List localCluster;
+private final List remoteCluster;
+private TwoRegionDLClient client;
+
+public TestRegionUnavailable() {
+super(true);
+this.localCluster = new ArrayList();
+this.remoteCluster = new ArrayList();
+}
+
+@Before
+@Override
+public void setup() throws Exception {
+DistributedLogConfiguration localConf = new 
DistributedLogConfiguration();
+localConf.addConfiguration(conf);
+localConf.setFeatureProviderClass(TestFeatureProvider.class);
+DistributedLogConfiguration remoteConf = new 
DistributedLogConfiguration();
+remoteConf.addConfiguration(conf);
+super.setup();
+int localPort = 9010;
+int remotePort = 9020;
+for (int i = 0; i < numServersPerDC; i++) {
+localCluster.add(createDistributedLogServer(localConf, localPort + 
i));
+remoteCluster.add(createDistributedLogServer(remoteConf, 
remotePort + i));
+}
+Map regionMap = new HashMap();
+for (DLServer server : localCluster) {
+regionMap.put(server.getAddress(), "local");
+}
+for (DLServer server : remoteCluster) {
+regionMap.put(server.getAddress(), "remote");
+}
+client = createTwoRegionDLClient("two_regions_client", regionMap);
+
+}
+
+private void registerStream(String streamName) {
+for (DLServer server : localCluster) {
+client.localRoutingService.addHost(streamName, 
server.getAddress());
+}
+client.remoteRoutingService.addHost(streamName, 

[14/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
--
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
new file mode 100644
index 000..c0c0972
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -0,0 +1,926 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.StatusCode;
+import org.apache.distributedlog.exceptions.StreamNotReadyException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.io.Abortables;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.FatalErrorHandler;
+import org.apache.distributedlog.service.ServerFeatureKeys;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.TimeSequencer;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.TimeoutException;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of {@link Stream}.
+ */
+public class StreamImpl implements Stream {
+
+private static final Logger logger = 
LoggerFactory.getLogger(StreamImpl.class);
+
+/**
+ * The status of the stream.
+ *
+ * The status change of the stream should just go in one direction. If 
a stream hits
+ * any error, the stream should be put in error state. If a stream is in 
error state,
+ * it should be removed and not reused anymore.
+ */
+public enum StreamStatus {
+UNINITIALIZED(-1),
+INITIALIZING(0),
+INITIALIZED(1),
+CLOSING(-4),
+CLOSED(-5),
+// if a stream is in error state, it should be abort during closing.
+ 

[25/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
--
diff --git 
a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
 
b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
deleted file mode 100644
index f44cddd..000
--- 
a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Addresses;
-import com.twitter.finagle.ChannelWriteException;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ConsistentHashRoutingService}.
- */
-public class TestConsistentHashRoutingService {
-
-@Test(timeout = 6)
-public void testBlackoutHost() throws Exception {
-TestName name = new TestName();
-RoutingService routingService = 
ConsistentHashRoutingService.newBuilder()
-.serverSet(new NameServerSet(name))
-.resolveFromName(true)
-.numReplicas(997)
-.blackoutSeconds(2)
-.build();
-
-InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
3181);
-Address address = Addresses.newInetAddress(inetAddress);
-List addresses = new ArrayList(1);
-addresses.add(address);
-name.changeAddrs(addresses);
-
-routingService.startService();
-
-RoutingService.RoutingContext routingContext =
-RoutingService.RoutingContext.of(new DefaultRegionResolver());
-
-String streamName = "test-blackout-host";
-assertEquals(inetAddress, routingService.getHost(streamName, 
routingContext));
-routingService.removeHost(inetAddress, new ChannelWriteException(new 
IOException("test exception")));
-try {
-routingService.getHost(streamName, routingContext);
-fail("Should fail to get host since no brokers are available");
-} catch (NoBrokersAvailableException nbae) {
-// expected
-}
-
-TimeUnit.SECONDS.sleep(3);
-assertEquals(inetAddress, routingService.getHost(streamName, 
routingContext));
-
-routingService.stopService();
-}
-
-@Test(timeout = 6)
-public void testPerformServerSetChangeOnName() throws Exception {
-TestName name = new TestName();
-ConsistentHashRoutingService routingService = 
(ConsistentHashRoutingService)
-ConsistentHashRoutingService.newBuilder()
-.serverSet(new NameServerSet(name))
-.resolveFromName(true)
-.numReplicas(997)
-.build();
-
-int basePort = 3180;
-int numHosts = 4;
-List addresses1 = Lists.newArrayListWithExpectedSize(4);
-List addresses2 = Lists.newArrayListWithExpectedSize(4);
-List addresses3 = 

[09/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
 
b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
deleted file mode 100644
index 267f75a..000
--- 
a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stats;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.codahale.metrics.servlets.AdminServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-/**
- * Starts a jetty server on a configurable port to export stats.
- */
-public class ServletReporter {
-
-private final MetricRegistry metricRegistry;
-private final HealthCheckRegistry healthCheckRegistry;
-private final int port;
-private final Server jettyServer;
-
-public ServletReporter(MetricRegistry metricRegistry,
-   HealthCheckRegistry healthCheckRegistry,
-   int port) {
-this.metricRegistry = metricRegistry;
-this.healthCheckRegistry = healthCheckRegistry;
-this.port = port;
-this.jettyServer = new Server(port);
-}
-
-public void start() throws Exception {
-ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
-context.setContextPath("/");
-jettyServer.setHandler(context);
-
-context.addEventListener(new 
HealthCheckServletContextListener(healthCheckRegistry));
-context.addEventListener(new 
MetricsServletContextListener(metricRegistry));
-context.addServlet(new ServletHolder(new AdminServlet()), "/*");
-
-jettyServer.start();
-}
-
-public void stop() throws Exception {
-jettyServer.stop();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
 
b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
deleted file mode 100644
index 5bdb3ce..000
--- 
a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}.
- */
-package org.apache.bookkeeper.stats;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
--
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
deleted file 

[30/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

2017-06-12 Thread sijie
DL-205: Remove StatusCode dependency on DLException

- Remove StatusCode from exceptions. Use integer as exception codes.
- Also re-organize the modules:

- [ ] distributedlog-protocol (for core structures) and distributedlog-core 
(for core library).
- [ ] proxy: distributedlog-proxy-protocol (new module for thrift generated 
protocol), distributedlog-proxy-client (proxy client) and 
distributedlog-proxy-server (proxy server)
- [ ] benchmark & tutorials.

Author: Sijie Guo 

Reviewers: Jia Zhai , Leigh Stewart 

Closes #131 from sijie/new_layout


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c44e0278
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c44e0278
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c44e0278

Branch: refs/heads/master
Commit: c44e0278ececde1942d5c43b06c7a12d375974a8
Parents: 52c0eef
Author: Sijie Guo 
Authored: Mon Jun 12 08:45:01 2017 -0700
Committer: Sijie Guo 
Committed: Mon Jun 12 08:45:01 2017 -0700

--
 distributedlog-benchmark/pom.xml|2 +-
 .../distributedlog/benchmark/WriterWorker.java  |3 +-
 distributedlog-client/pom.xml   |  172 ---
 .../distributedlog/client/ClientConfig.java |  187 ---
 .../client/DistributedLogClientImpl.java| 1200 --
 .../client/DistributedLogMultiStreamWriter.java |  486 ---
 .../client/monitor/MonitorServiceClient.java|   68 -
 .../client/monitor/package-info.java|   21 -
 .../client/ownership/OwnershipCache.java|  235 
 .../client/ownership/package-info.java  |   21 -
 .../distributedlog/client/package-info.java |   21 -
 .../client/proxy/ClusterClient.java |   51 -
 .../client/proxy/HostProvider.java  |   35 -
 .../client/proxy/ProxyClient.java   |  165 ---
 .../client/proxy/ProxyClientManager.java|  362 --
 .../client/proxy/ProxyListener.java |   50 -
 .../client/proxy/package-info.java  |   21 -
 .../client/resolver/DefaultRegionResolver.java  |   85 --
 .../client/resolver/RegionResolver.java |   43 -
 .../client/resolver/package-info.java   |   21 -
 .../routing/ConsistentHashRoutingService.java   |  500 
 .../client/routing/NameServerSet.java   |  263 
 .../client/routing/RegionsRoutingService.java   |  192 ---
 .../client/routing/RoutingService.java  |  206 ---
 .../client/routing/RoutingServiceProvider.java  |   39 -
 .../client/routing/RoutingUtils.java|   88 --
 .../client/routing/ServerSetRoutingService.java |  274 
 .../client/routing/ServerSetWatcher.java|   71 --
 .../routing/SingleHostRoutingService.java   |  128 --
 .../distributedlog/client/routing/TestName.java |   49 -
 .../client/routing/TwitterServerSetWatcher.java |   83 --
 .../client/routing/package-info.java|   21 -
 .../client/serverset/DLZkServerSet.java |   91 --
 .../client/serverset/package-info.java  |   21 -
 ...efaultSpeculativeRequestExecutionPolicy.java |  119 --
 .../SpeculativeRequestExecutionPolicy.java  |   34 -
 .../speculative/SpeculativeRequestExecutor.java |   33 -
 .../client/speculative/package-info.java|   21 -
 .../client/stats/ClientStats.java   |  108 --
 .../client/stats/ClientStatsLogger.java |   91 --
 .../distributedlog/client/stats/OpStats.java|   82 --
 .../client/stats/OpStatsLogger.java |   50 -
 .../client/stats/OwnershipStatsLogger.java  |  115 --
 .../client/stats/package-info.java  |   21 -
 .../distributedlog/service/DLSocketAddress.java |  161 ---
 .../service/DistributedLogClient.java   |  108 --
 .../service/DistributedLogClientBuilder.java|  608 -
 .../distributedlog/service/package-info.java|   21 -
 .../src/main/resources/findbugsExclude.xml  |   23 -
 .../TestDistributedLogMultiStreamWriter.java|  383 --
 .../client/ownership/TestOwnershipCache.java|  207 ---
 .../proxy/MockDistributedLogServices.java   |  144 ---
 .../client/proxy/MockProxyClientBuilder.java|   49 -
 .../client/proxy/MockThriftClient.java  |   32 -
 .../client/proxy/TestProxyClientManager.java|  368 --
 .../TestConsistentHashRoutingService.java   |  417 --
 .../client/routing/TestInetNameResolution.java  |   73 --
 .../routing/TestRegionsRoutingService.java  |  133 --
 .../client/routing/TestRoutingService.java  |  146 ---
 ...efaultSpeculativeRequestExecutionPolicy.java |  105 --
 .../TestDistributedLogClientBuilder.java|   49 -
 .../src/test/resources/log4j.properties |   51 -
 

Re: [VOTE] Merge DistributedLog as the subproject of Apache BookKeeper

2017-06-12 Thread Leigh Stewart
+1

On Fri, Jun 9, 2017 at 2:48 PM, Uma gangumalla  wrote:

> +1 (binding)
>
> Regards,
> Uma
>
> On Thu, Jun 8, 2017 at 5:21 PM, Sijie Guo  wrote:
>
> > ( /cc bookkeeper dev@ and incubator general@ for awareness )
> >
> > Hi all,
> >
> > There was a joint discussion between BookKeeper PMC and DistributedLog
> PPMC
> > about moving the development of DistributedLog as part of Apache
> > BookKeeper. The reasons behind it are:
> >
> > First, DistributedLog is born as an extension to BookKeeper, to offer
> > continuous log streams as the service. The ledger API in bookkeeper is a
> > lower level API and has learning curves, while the log stream API in
> > distributedlog is a higher level API that simplifies the usage. The
> > combination of ledger API and stream API would offer a better
> > developer/user experience for applications.
> >
> > Secondly, using ledgers to build continuous (re-openable) log stream is a
> > very common pattern for BookKeeper use cases. We did this for HDFS
> namenode
> > journal, for Hedwig, for DistributedLog, and for Pulsar. The same pattern
> > has been implemented again and again. Merge DistributedLog (also
> > ManagedLedger in Pulsar) with BookKeeper will consolidate all the
> > development efforts around this common 'log stream' pattern.
> >
> > Thirdly, the 'log' stream abstraction is a good abstraction for both
> > messaging and streaming. Internally at BookKeeper, there are a few places
> > that can use such 'messaging' facility to improve bookkeeper itself. the
> > log stream in DistributedLog can be used internally at bookkeeper for
> > streaming changes as well.
> >
> > We choose merging DistributedLog as subproject rather than modules. It
> is a
> > softer starting point to avoid disrupting the folks who are depending on
> > the ledger api alone. The BookKeeper PMC and DistributedLog PPMC has
> > achieved initial consensus on this merge. There is an official VOTE
> ongoing
> > in bookkeeper PMC. We'd like to bring this to the distributedlog
> community
> > for a community vote following the guidelines here
> > .
> >
> > Please vote +1 if in favor of merging DistributedLog to BookKeeper, and
> -1
> > if not. The vote will be open until Tuesday 13rd June, 18:00 PST.
> >
> > - Sijie
> >
>