Repository: curator Updated Branches: refs/heads/CURATOR-3.0 df949e7a2 -> 0fe4d969f
interim work - updated APIs make sure old tests work Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2c0fca86 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2c0fca86 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2c0fca86 Branch: refs/heads/CURATOR-3.0 Commit: 2c0fca86ca3c25c9777e1b5c3bb3eea6cb0be8da Parents: ea36769 Author: randgalt <randg...@apache.org> Authored: Fri Sep 25 19:14:53 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Sep 25 19:14:53 2015 -0500 ---------------------------------------------------------------------- .../api/AddStatConfigEnsembleable.java | 5 +- .../framework/api/BackgroundEnsembleable.java | 7 + .../framework/api/BackgroundStatable.java | 24 - .../curator/framework/api/Configurable.java | 31 -- .../framework/api/ConfigureEnsembleable.java | 32 ++ .../curator/framework/api/GetConfigBuilder.java | 9 +- .../api/JoinAddStatConfigEnsembleable.java | 3 +- .../api/JoinLeaveStatConfigEnsembleable.java | 3 +- .../api/JoinStatConfigEnsembleable.java | 5 +- .../framework/api/JoinStatConfigurable.java | 2 +- .../api/LeaveAddStatConfigEnsembleable.java | 3 +- .../api/LeaveStatConfigEnsembleable.java | 5 +- .../curator/framework/api/ReconfigBuilder.java | 4 +- .../framework/api/StatConfigEnsembleable.java | 26 - .../curator/framework/api/StatEnsembleable.java | 26 - .../api/WatchBackgroundEnsembleable.java | 7 + .../framework/imps/GetConfigBuilderImpl.java | 125 ++++- .../framework/imps/ReconfigBuilderImpl.java | 110 +++-- .../framework/imps/TestReconfiguration.java | 474 +++++-------------- .../framework/imps/TestReconfigurationX.java | 418 ++++++++++++++++ 20 files changed, 798 insertions(+), 521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java index 16f78a2..c60f617 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java @@ -25,8 +25,9 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface AddStatConfigEnsembleable extends - Addable<StatConfigEnsembleable>, - StatConfigEnsembleable + Addable<Statable<ConfigureEnsembleable>>, + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundEnsembleable.java new file mode 100644 index 0000000..ae2b226 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundEnsembleable.java @@ -0,0 +1,7 @@ +package org.apache.curator.framework.api; + +public interface BackgroundEnsembleable<T> extends + Backgroundable<Ensembleable<T>>, + Ensembleable<T> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java deleted file mode 100644 index 77c4e96..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.api; - -public interface BackgroundStatable<T> extends - Backgroundable<T>, - Statable<T> { -} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java deleted file mode 100644 index 2bc0494..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework.api; - -public interface Configurable -{ - - /** - * Sets the configuration version to use. - * @param config The version of the configuration. - * @throws Exception - */ - StatEnsembleable<byte[]> fromConfig(long config) throws Exception; -} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/ConfigureEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ConfigureEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ConfigureEnsembleable.java new file mode 100644 index 0000000..8c739bc --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ConfigureEnsembleable.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.api; + +public interface ConfigureEnsembleable extends + Ensembleable<byte[]> +{ + + /** + * Sets the configuration version to use. + * @param config The version of the configuration. + * @throws Exception + */ + Ensembleable<byte[]> fromConfig(long config) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java index c42e4cb..d137f28 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java @@ -20,10 +20,9 @@ package org.apache.curator.framework.api; public interface GetConfigBuilder extends - Watchable<BackgroundStatable<Ensembleable<byte[]>>>, - BackgroundStatable<Ensembleable<byte[]>>, - Ensembleable<byte[]> + Ensembleable<byte[]>, + Backgroundable<Ensembleable<byte[]>>, + Watchable<BackgroundEnsembleable<byte[]>>, + Statable<WatchBackgroundEnsembleable<byte[]>> { } - - http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java index a905dd1..4356ba7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java @@ -27,7 +27,8 @@ package org.apache.curator.framework.api; public interface JoinAddStatConfigEnsembleable extends Joinable<AddStatConfigEnsembleable>, Addable<JoinStatConfigurable>, - StatConfigEnsembleable + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java index 9642297..fac16a9 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java @@ -27,7 +27,8 @@ package org.apache.curator.framework.api; public interface JoinLeaveStatConfigEnsembleable extends Joinable<LeaveStatConfigEnsembleable>, Leaveable<JoinStatConfigEnsembleable>, - StatConfigEnsembleable + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java index 5fe7a8c..7ab51e2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java @@ -25,8 +25,9 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface JoinStatConfigEnsembleable extends - Joinable<StatConfigEnsembleable>, - StatConfigEnsembleable + Joinable<ConfigureEnsembleable>, + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java index ef17ef4..18713e4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java @@ -25,6 +25,6 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface JoinStatConfigurable extends - Joinable<Configurable> + Joinable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java index 7912d45..b5125dc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java @@ -27,6 +27,7 @@ package org.apache.curator.framework.api; public interface LeaveAddStatConfigEnsembleable extends Leaveable<AddStatConfigEnsembleable>, Addable<LeaveStatConfigEnsembleable>, - StatConfigEnsembleable + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java index ddad854..1464d26 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java @@ -25,8 +25,9 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface LeaveStatConfigEnsembleable extends - Leaveable<StatConfigEnsembleable>, - StatConfigEnsembleable + Leaveable<Statable<ConfigureEnsembleable>>, + ConfigureEnsembleable, + Statable<ConfigureEnsembleable> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java index 438abcf..d8a2cc2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java @@ -20,9 +20,7 @@ package org.apache.curator.framework.api; public interface ReconfigBuilder extends - Joinable<LeaveAddStatConfigEnsembleable>, - Leaveable<JoinAddStatConfigEnsembleable>, - Addable<JoinLeaveStatConfigEnsembleable>, + ReconfigBuilderMain, Backgroundable<ReconfigBuilderMain> { } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java deleted file mode 100644 index 4700c8c..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework.api; - -public interface StatConfigEnsembleable extends - Configurable, - StatEnsembleable<byte[]> -{ -} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java deleted file mode 100644 index 0993b50..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework.api; - -public interface StatEnsembleable<T> extends - Statable<Ensembleable<T>>, - Ensembleable<T> -{ -} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/api/WatchBackgroundEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/WatchBackgroundEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchBackgroundEnsembleable.java new file mode 100644 index 0000000..073cfe3 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchBackgroundEnsembleable.java @@ -0,0 +1,7 @@ +package org.apache.curator.framework.api; + +public interface WatchBackgroundEnsembleable<T> extends + Watchable<BackgroundEnsembleable<T>>, + BackgroundEnsembleable<T> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 5468bd4..09cb0ab 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -22,12 +22,13 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundStatable; +import org.apache.curator.framework.api.BackgroundEnsembleable; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.api.Ensembleable; import org.apache.curator.framework.api.GetConfigBuilder; +import org.apache.curator.framework.api.WatchBackgroundEnsembleable; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; @@ -51,31 +52,92 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public WatchBackgroundEnsembleable<byte[]> storingStatIn(Stat stat) { this.stat = stat; - return this; + return new WatchBackgroundEnsembleable<byte[]>() + { + @Override + public Ensembleable<byte[]> inBackground() + { + return GetConfigBuilderImpl.this.inBackground(); + } + + @Override + public Ensembleable<byte[]> inBackground(Object context) + { + return GetConfigBuilderImpl.this.inBackground(context); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback) + { + return GetConfigBuilderImpl.this.inBackground(callback); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context) + { + return GetConfigBuilderImpl.this.inBackground(callback, context); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Executor executor) + { + return GetConfigBuilderImpl.this.inBackground(callback, executor); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + return GetConfigBuilderImpl.this.inBackground(callback, context, executor); + } + + @Override + public byte[] forEnsemble() throws Exception + { + return GetConfigBuilderImpl.this.forEnsemble(); + } + + @Override + public BackgroundEnsembleable<byte[]> watched() + { + return GetConfigBuilderImpl.this.watched(); + } + + @Override + public BackgroundEnsembleable<byte[]> usingWatcher(Watcher watcher) + { + return GetConfigBuilderImpl.this.usingWatcher(watcher); + } + + @Override + public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) + { + return GetConfigBuilderImpl.this.usingWatcher(watcher); + } + }; } @Override - public BackgroundStatable<Ensembleable<byte[]>> watched() + public BackgroundEnsembleable<byte[]> watched() { watching = new Watching(true); - return this; + return new InternalBackgroundEnsembleable(); } @Override - public GetConfigBuilder usingWatcher(Watcher watcher) + public BackgroundEnsembleable<byte[]> usingWatcher(Watcher watcher) { watching = new Watching(client, watcher); - return this; + return new InternalBackgroundEnsembleable(); } @Override - public GetConfigBuilder usingWatcher(final CuratorWatcher watcher) + public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) { watching = new Watching(client, watcher); - return this; + return new InternalBackgroundEnsembleable(); } @Override @@ -185,4 +247,49 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati trace.commit(); } } + + private class InternalBackgroundEnsembleable implements BackgroundEnsembleable<byte[]> + { + @Override + public Ensembleable<byte[]> inBackground() + { + return GetConfigBuilderImpl.this.inBackground(); + } + + @Override + public Ensembleable<byte[]> inBackground(Object context) + { + return GetConfigBuilderImpl.this.inBackground(context); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback) + { + return GetConfigBuilderImpl.this.inBackground(callback); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context) + { + return GetConfigBuilderImpl.this.inBackground(callback, context); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Executor executor) + { + return GetConfigBuilderImpl.this.inBackground(callback, executor); + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + return GetConfigBuilderImpl.this.inBackground(callback, context, executor); + } + + @Override + public byte[] forEnsemble() throws Exception + { + return GetConfigBuilderImpl.this.forEnsemble(); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 0efa481..832272b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -34,10 +34,8 @@ import java.util.concurrent.Executor; public class ReconfigBuilderImpl implements ReconfigBuilder, ReconfigBuilderMain, - StatEnsembleable<byte[]>, - Configurable, - StatConfigEnsembleable, - BackgroundOperation<Void> + ConfigureEnsembleable, + BackgroundOperation<Void>,Statable<Ensembleable<byte[]>>,Ensembleable<byte[]> { private final CuratorFrameworkImpl client; @@ -75,7 +73,7 @@ public class ReconfigBuilderImpl implements } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { fromConfig = config; return this; @@ -101,13 +99,26 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new ConfigureEnsembleable() + { + @Override + public Ensembleable<byte[]> fromConfig(long config) throws Exception + { + return ReconfigBuilderImpl.this.fromConfig(config); + } + + @Override + public byte[] forEnsemble() throws Exception + { + return ReconfigBuilderImpl.this.forEnsemble(); + } + }; } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @@ -130,25 +141,25 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @Override - public StatConfigEnsembleable leaving(List<String> servers) + public Statable<ConfigureEnsembleable> leaving(List<String> servers) { return ReconfigBuilderImpl.this.leaving(servers); } @Override - public StatConfigEnsembleable leaving(String... server) + public Statable<ConfigureEnsembleable> leaving(String... server) { return ReconfigBuilderImpl.this.leaving(server); } @@ -173,25 +184,25 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @Override - public StatConfigEnsembleable joining(List<String> servers) + public ConfigureEnsembleable joining(List<String> servers) { return ReconfigBuilderImpl.this.joining(servers); } @Override - public StatConfigEnsembleable joining(String... server) + public ConfigureEnsembleable joining(String... server) { return ReconfigBuilderImpl.this.joining(server); } @@ -262,13 +273,13 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @@ -291,25 +302,25 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @Override - public StatConfigEnsembleable leaving(List<String> servers) + public Statable<ConfigureEnsembleable> leaving(List<String> servers) { return ReconfigBuilderImpl.this.leaving(servers); } @Override - public StatConfigEnsembleable leaving(String... server) + public Statable<ConfigureEnsembleable> leaving(String... server) { return ReconfigBuilderImpl.this.leaving(server); } @@ -334,25 +345,25 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public ConfigureEnsembleable fromConfig(long config) throws Exception { - return ReconfigBuilderImpl.this.fromConfig(config); + return new InternalConfigureEnsembleable(); } @Override - public StatConfigEnsembleable adding(List<String> servers) + public Statable<ConfigureEnsembleable> adding(List<String> servers) { return ReconfigBuilderImpl.this.adding(servers); } @Override - public StatConfigEnsembleable adding(String... server) + public Statable<ConfigureEnsembleable> adding(String... server) { return ReconfigBuilderImpl.this.adding(server); } @@ -381,13 +392,13 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @@ -404,13 +415,13 @@ public class ReconfigBuilderImpl implements return new JoinStatConfigurable() { @Override - public Configurable joining(List<String> servers) + public ConfigureEnsembleable joining(List<String> servers) { return ReconfigBuilderImpl.this.joining(servers); } @Override - public Configurable joining(String... server) + public ConfigureEnsembleable joining(String... server) { return ReconfigBuilderImpl.this.joining(server); } @@ -435,25 +446,25 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable<byte[]> storingStatIn(Stat stat) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return ReconfigBuilderImpl.this.storingStatIn(stat); + return new InternalConfigureEnsembleable(); } @Override - public StatEnsembleable<byte[]> fromConfig(long config) throws Exception + public Ensembleable<byte[]> fromConfig(long config) throws Exception { return ReconfigBuilderImpl.this.fromConfig(config); } @Override - public StatConfigEnsembleable adding(List<String> servers) + public Statable<ConfigureEnsembleable> adding(List<String> servers) { return ReconfigBuilderImpl.this.adding(servers); } @Override - public StatConfigEnsembleable adding(String... server) + public Statable<ConfigureEnsembleable> adding(String... server) { return ReconfigBuilderImpl.this.adding(server); } @@ -501,4 +512,19 @@ public class ReconfigBuilderImpl implements trace.commit(); return responseData; } + + private class InternalConfigureEnsembleable implements ConfigureEnsembleable + { + @Override + public Ensembleable<byte[]> fromConfig(long config) throws Exception + { + return ReconfigBuilderImpl.this.fromConfig(config); + } + + @Override + public byte[] forEnsemble() throws Exception + { + return ReconfigBuilderImpl.this.forEnsemble(); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 2438ef8..d4c89be 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -19,400 +19,184 @@ package org.apache.curator.framework.imps; -import org.apache.curator.ensemble.EnsembleListener; -import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.ensemble.EnsembleTracker; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; -import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.IOException; -import java.io.StringReader; -import java.util.HashMap; -import java.util.Map; +import java.io.ByteArrayInputStream; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -public class TestReconfiguration +public class TestReconfiguration extends BaseClassForTests { - private static final Timing timing = new Timing(); private TestingCluster cluster; - private DynamicEnsembleProvider dynamicEnsembleProvider; - private WaitOnDelegateListener waitOnDelegateListener; - private EnsembleTracker ensembleTracker; - private CuratorFramework client; - - private String connectionString1to5; - private String connectionString2to5; - private String connectionString3to5; @BeforeMethod + @Override public void setup() throws Exception { - cluster = new TestingCluster(5); - cluster.start(); - - connectionString1to5 = cluster.getConnectString(); - connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5); - connectionString3to5 = getConnectionString(cluster, 3, 4, 5); + super.setup(); - dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5); - client = CuratorFrameworkFactory.builder() - .ensembleProvider(dynamicEnsembleProvider) - .retryPolicy(new RetryOneTime(1)) - .build(); - client.start(); - client.blockUntilConnected(); - - //Wrap around the dynamic ensemble provider, so that we can wait until it has received the event. - waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider); - ensembleTracker = new EnsembleTracker(client); - ensembleTracker.getListenable().addListener(waitOnDelegateListener); - ensembleTracker.start(); - //Wait for the initial event. - waitOnDelegateListener.waitForEvent(); + CloseableUtils.closeQuietly(server); + server = null; + cluster = new TestingCluster(3); + cluster.start(); } @AfterMethod - public void tearDown() throws IOException + @Override + public void teardown() throws Exception { - CloseableUtils.closeQuietly(ensembleTracker); - CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(cluster); - } - - @Test - public void testSyncIncremental() throws Exception - { - Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); - Assert.assertNotNull(bytes); - QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - - //Remove Servers - bytes = client.reconfig().leaving("1").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - bytes = client.reconfig().leaving("2").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); - - //Add Servers - bytes = client.reconfig().joining("server.2=" + server2).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - - bytes = client.reconfig().joining("server.1=" + server1).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + super.teardown(); } - @Test - public void testAsyncIncremental() throws Exception + @SuppressWarnings("ConstantConditions") + public void testApiPermutations() throws Exception { - final AtomicReference<byte[]> bytes = new AtomicReference<>(); - final BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - bytes.set(event.getData()); - //We only need the latch on getConfig. - if ( event.getContext() != null ) - { - ((CountDownLatch)event.getContext()).countDown(); - } - } - - }; - - CountDownLatch latch = new CountDownLatch(1); - client.getConfig().inBackground(callback, latch).forEnsemble(); - Assert.assertTrue(timing.awaitLatch(latch)); - Assert.assertNotNull(bytes.get()); - QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - - //Remove Servers - client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } - - @Test - public void testSyncNonIncremental() throws Exception - { - Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); - Assert.assertNotNull(bytes); - QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - String server3 = getServerString(qv, cluster, 3L); - String server4 = getServerString(qv, cluster, 4L); - String server5 = getServerString(qv, cluster, 5L); - - //Remove Servers - bytes = client.reconfig() - .adding("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - - bytes = client.reconfig() - .adding("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); - - //Add Servers - bytes = client.reconfig() - .adding("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - - bytes = client.reconfig() - .adding("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + // not an actual test. Specifies all possible API possibilities + + Watcher watcher = null; + Stat stat = null; + CuratorFramework client = null; + + client.getConfig().forEnsemble(); + client.getConfig().inBackground().forEnsemble(); + client.getConfig().usingWatcher(watcher).forEnsemble(); + client.getConfig().usingWatcher(watcher).inBackground().forEnsemble(); + client.getConfig().storingStatIn(stat).forEnsemble(); + client.getConfig().storingStatIn(stat).inBackground().forEnsemble(); + client.getConfig().storingStatIn(stat).usingWatcher(watcher).forEnsemble(); + client.getConfig().storingStatIn(stat).usingWatcher(watcher).inBackground().forEnsemble(); + + // --------- + + client.reconfig().adding().forEnsemble(); + client.reconfig().leaving().forEnsemble(); + client.reconfig().joining().forEnsemble(); + client.reconfig().adding().leaving().forEnsemble(); + client.reconfig().adding().joining().forEnsemble(); + client.reconfig().leaving().joining().forEnsemble(); + + client.reconfig().adding().fromConfig(0).forEnsemble(); + client.reconfig().leaving().fromConfig(0).forEnsemble(); + client.reconfig().joining().fromConfig(0).forEnsemble(); + client.reconfig().adding().leaving().fromConfig(0).forEnsemble(); + client.reconfig().adding().joining().fromConfig(0).forEnsemble(); + client.reconfig().leaving().joining().fromConfig(0).forEnsemble(); + + client.reconfig().adding().fromConfig(0).forEnsemble(); + client.reconfig().leaving().fromConfig(0).forEnsemble(); + client.reconfig().joining().fromConfig(0).forEnsemble(); + client.reconfig().adding().leaving().fromConfig(0).forEnsemble(); + client.reconfig().adding().joining().fromConfig(0).forEnsemble(); + client.reconfig().leaving().joining().fromConfig(0).forEnsemble(); + + client.reconfig().adding().storingStatIn(stat).forEnsemble(); + client.reconfig().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().adding().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().adding().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().leaving().joining().storingStatIn(stat).forEnsemble(); + + client.reconfig().adding().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + + client.reconfig().inBackground().adding().forEnsemble(); + client.reconfig().inBackground().leaving().forEnsemble(); + client.reconfig().inBackground().joining().forEnsemble(); + client.reconfig().inBackground().adding().leaving().forEnsemble(); + client.reconfig().inBackground().adding().joining().forEnsemble(); + client.reconfig().inBackground().leaving().joining().forEnsemble(); + + client.reconfig().inBackground().adding().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().joining().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble(); + + client.reconfig().inBackground().adding().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().joining().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble(); + + client.reconfig().inBackground().adding().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().adding().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().adding().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().leaving().joining().storingStatIn(stat).forEnsemble(); + + client.reconfig().inBackground().adding().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); } @Test - public void testAsyncNonIncremental() throws Exception + public void testBasicGetConfig() throws Exception { - final AtomicReference<byte[]> bytes = new AtomicReference<>(); - final BackgroundCallback callback = new BackgroundCallback() + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) ) { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - bytes.set(event.getData()); - ((CountDownLatch)event.getContext()).countDown(); - } + client.start(); + QuorumVerifier quorumVerifier = toQuorumVerifier(client.getConfig().forEnsemble()); + System.out.println(quorumVerifier); - }; - - CountDownLatch latch = new CountDownLatch(1); - client.getConfig().inBackground(callback, latch).forEnsemble(); - Assert.assertTrue(timing.awaitLatch(latch)); - Assert.assertNotNull(bytes.get()); - QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - String server3 = getServerString(qv, cluster, 3L); - String server4 = getServerString(qv, cluster, 4L); - String server5 = getServerString(qv, cluster, 5L); - - //Remove Servers - client.reconfig().inBackground(callback, latch) - .adding("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - client.reconfig().inBackground(callback, latch) - .adding("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - client.reconfig().inBackground(callback, latch) - .adding("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - - client.reconfig().inBackground(callback, latch) - .adding("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).forEnsemble(); - waitOnDelegateListener.waitForEvent(); - Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } - - static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception - { - Properties properties = new Properties(); - properties.load(new StringReader(new String(bytes))); - return new QuorumMaj(properties); - } - - static InstanceSpec getInstance(TestingCluster cluster, int id) - { - for ( InstanceSpec spec : cluster.getInstances() ) - { - if ( spec.getServerId() == id ) + for ( InstanceSpec instance : cluster.getInstances() ) { - return spec; + QuorumPeer.QuorumServer quorumServer = quorumVerifier.getAllMembers().get((long)instance.getServerId()); + Assert.assertNotNull(quorumServer); + Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort()); } } - throw new IllegalStateException("InstanceSpec with id:" + id + " not found"); } - static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception + @Test + public void testAdd1Sync() throws Exception { - String str = qv.getAllMembers().get(id).toString(); - //check if connection string is already there. - if ( str.contains(";") ) - { - return str; - } - else + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) ) { - return str + ";" + getInstance(cluster, (int)id).getConnectString(); - } - } + client.start(); - static String getConnectionString(TestingCluster cluster, long... ids) throws Exception - { - StringBuilder sb = new StringBuilder(); - Map<Long, InstanceSpec> specs = new HashMap<>(); - for ( InstanceSpec spec : cluster.getInstances() ) - { - specs.put((long)spec.getServerId(), spec); - } - for ( long id : ids ) - { - if ( sb.length() != 0 ) + Watcher watcher = new Watcher() { - sb.append(","); - } - sb.append(specs.get(id).getConnectString()); + @Override + public void process(WatchedEvent event) + { + + } + }; + client.getConfig().usingWatcher(watcher).forEnsemble(); } - return sb.toString(); } - //Simple EnsembleListener that can wait until the delegate handles the event. - private static class WaitOnDelegateListener implements EnsembleListener + private static QuorumVerifier toQuorumVerifier(byte[] bytes) throws Exception { - private CountDownLatch latch = new CountDownLatch(1); - - private final EnsembleListener delegate; - - private WaitOnDelegateListener(EnsembleListener delegate) - { - this.delegate = delegate; - } - - @Override - public void connectionStringUpdated(String connectionString) - { - delegate.connectionStringUpdated(connectionString); - latch.countDown(); - } - - public void waitForEvent() throws InterruptedException, TimeoutException - { - if ( timing.awaitLatch(latch) ) - { - latch = new CountDownLatch(1); - } - else - { - throw new TimeoutException("Failed to receive event in time."); - } - } + Assert.assertNotNull(bytes); + Properties properties = new Properties(); + properties.load(new ByteArrayInputStream(bytes)); + return new QuorumMaj(properties); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/2c0fca86/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java new file mode 100644 index 0000000..2268055 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.imps; + +import org.apache.curator.ensemble.EnsembleListener; +import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.ensemble.EnsembleTracker; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +public class TestReconfigurationX +{ + private static final Timing timing = new Timing(); + private TestingCluster cluster; + private DynamicEnsembleProvider dynamicEnsembleProvider; + private WaitOnDelegateListener waitOnDelegateListener; + private EnsembleTracker ensembleTracker; + private CuratorFramework client; + + private String connectionString1to5; + private String connectionString2to5; + private String connectionString3to5; + + @BeforeMethod + public void setup() throws Exception + { + cluster = new TestingCluster(5); + cluster.start(); + + connectionString1to5 = cluster.getConnectString(); + connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5); + connectionString3to5 = getConnectionString(cluster, 3, 4, 5); + + dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5); + client = CuratorFrameworkFactory.builder() + .ensembleProvider(dynamicEnsembleProvider) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + client.blockUntilConnected(); + + //Wrap around the dynamic ensemble provider, so that we can wait until it has received the event. + waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider); + ensembleTracker = new EnsembleTracker(client); + ensembleTracker.getListenable().addListener(waitOnDelegateListener); + ensembleTracker.start(); + //Wait for the initial event. + waitOnDelegateListener.waitForEvent(); + } + + @AfterMethod + public void tearDown() throws IOException + { + CloseableUtils.closeQuietly(ensembleTracker); + CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(cluster); + } + + @Test + public void testSyncIncremental() throws Exception + { + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + //Remove Servers + bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + + //Add Servers + bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + } + + @Test + public void testAsyncIncremental() throws Exception + { + final AtomicReference<byte[]> bytes = new AtomicReference<>(); + final BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + bytes.set(event.getData()); + //We only need the latch on getConfig. + if ( event.getContext() != null ) + { + ((CountDownLatch)event.getContext()).countDown(); + } + } + + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().inBackground(callback, latch).forEnsemble(); + Assert.assertTrue(timing.awaitLatch(latch)); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + //Remove Servers + client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + //Add Servers + client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + } + + @Test + public void testSyncNonIncremental() throws Exception + { + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + //Remove Servers + bytes = client.reconfig() + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig() + .adding("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + + //Add Servers + bytes = client.reconfig() + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig() + .adding("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + } + + @Test + public void testAsyncNonIncremental() throws Exception + { + final AtomicReference<byte[]> bytes = new AtomicReference<>(); + final BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + bytes.set(event.getData()); + ((CountDownLatch)event.getContext()).countDown(); + } + + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().inBackground(callback, latch).forEnsemble(); + Assert.assertTrue(timing.awaitLatch(latch)); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + //Remove Servers + client.reconfig().inBackground(callback, latch) + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig().inBackground(callback, latch) + .adding("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + //Add Servers + client.reconfig().inBackground(callback, latch) + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig().inBackground(callback, latch) + .adding("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + } + + static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception + { + Properties properties = new Properties(); + properties.load(new StringReader(new String(bytes))); + return new QuorumMaj(properties); + } + + static InstanceSpec getInstance(TestingCluster cluster, int id) + { + for ( InstanceSpec spec : cluster.getInstances() ) + { + if ( spec.getServerId() == id ) + { + return spec; + } + } + throw new IllegalStateException("InstanceSpec with id:" + id + " not found"); + } + + static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception + { + String str = qv.getAllMembers().get(id).toString(); + //check if connection string is already there. + if ( str.contains(";") ) + { + return str; + } + else + { + return str + ";" + getInstance(cluster, (int)id).getConnectString(); + } + } + + static String getConnectionString(TestingCluster cluster, long... ids) throws Exception + { + StringBuilder sb = new StringBuilder(); + Map<Long, InstanceSpec> specs = new HashMap<>(); + for ( InstanceSpec spec : cluster.getInstances() ) + { + specs.put((long)spec.getServerId(), spec); + } + for ( long id : ids ) + { + if ( sb.length() != 0 ) + { + sb.append(","); + } + sb.append(specs.get(id).getConnectString()); + } + return sb.toString(); + } + + //Simple EnsembleListener that can wait until the delegate handles the event. + private static class WaitOnDelegateListener implements EnsembleListener + { + private CountDownLatch latch = new CountDownLatch(1); + + private final EnsembleListener delegate; + + private WaitOnDelegateListener(EnsembleListener delegate) + { + this.delegate = delegate; + } + + @Override + public void connectionStringUpdated(String connectionString) + { + delegate.connectionStringUpdated(connectionString); + latch.countDown(); + } + + public void waitForEvent() throws InterruptedException, TimeoutException + { + if ( timing.awaitLatch(latch) ) + { + latch = new CountDownLatch(1); + } + else + { + throw new TimeoutException("Failed to receive event in time."); + } + } + } +} \ No newline at end of file