wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4465fe0f Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4465fe0f Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4465fe0f Branch: refs/heads/curator-rpc Commit: 4465fe0f5ac3b7f7c2287980bc114ece9d635f16 Parents: 530010d Author: randgalt <randg...@apache.org> Authored: Thu May 29 10:38:52 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu May 29 10:38:52 2014 -0500 ---------------------------------------------------------------------- .../x/rpc/details/RpcBackgroundCallback.java | 26 + .../curator/x/rpc/details/RpcWatcher.java | 25 + .../x/rpc/idl/event/RpcCuratorEvent.java | 16 +- .../projection/CuratorProjectionService.java | 100 +- .../x/rpc/idl/projection/GetDataSpec.java | 50 + .../x/rpc/idl/projection/SetDataSpec.java | 58 + .../curator/x/rpc/idl/projection/Version.java | 20 + curator-x-rpc/src/main/thrift/curator.thrift | 24 +- .../apache/curator/generated/CuratorEvent.java | 1 + .../curator/generated/CuratorService.java | 2300 ++++++++++++++++-- .../apache/curator/generated/GetDataSpec.java | 680 ++++++ .../apache/curator/generated/SetDataSpec.java | 896 +++++++ .../org/apache/curator/generated/Version.java | 386 +++ .../org/apache/curator/x/rpc/TestClient.java | 25 +- 14 files changed, 4385 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcBackgroundCallback.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcBackgroundCallback.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcBackgroundCallback.java new file mode 100644 index 0000000..22a3fd2 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcBackgroundCallback.java @@ -0,0 +1,26 @@ +package org.apache.curator.x.rpc.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.apache.curator.x.rpc.idl.projection.CuratorProjection; +import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService; + +public class RpcBackgroundCallback implements BackgroundCallback +{ + private final CuratorProjection projection; + private final CuratorProjectionService projectionService; + + public RpcBackgroundCallback(CuratorProjectionService projectionService, CuratorProjection projection) + { + this.projection = projection; + this.projectionService = projectionService; + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + projectionService.addEvent(projection, new RpcCuratorEvent(event)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcWatcher.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcWatcher.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcWatcher.java new file mode 100644 index 0000000..55dd866 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/details/RpcWatcher.java @@ -0,0 +1,25 @@ +package org.apache.curator.x.rpc.details; + +import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.apache.curator.x.rpc.idl.projection.CuratorProjection; +import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +public class RpcWatcher implements Watcher +{ + private final CuratorProjection projection; + private final CuratorProjectionService projectionService; + + public RpcWatcher(CuratorProjectionService projectionService, CuratorProjection projection) + { + this.projection = projection; + this.projectionService = projectionService; + } + + @Override + public void process(WatchedEvent event) + { + projectionService.addEvent(projection, new RpcCuratorEvent(event)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java index c6802e4..50bbbf3 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java @@ -108,6 +108,20 @@ public class RpcCuratorEvent this.watchedEvent = null; } + public RpcCuratorEvent(WatchedEvent event) + { + this.type = RpcCuratorEventType.WATCHED; + this.resultCode = 0; + this.path = event.getPath(); + this.context = null; + this.stat = null; + this.data = null; + this.name = null; + this.children = null; + this.aclList = null; + this.watchedEvent = new RpcWatchedEvent(toRpcKeeperState(event.getState()), toRpcEventType(event.getType()), event.getPath()); + } + private RpcCuratorEventType toRpcCuratorEventType(ConnectionState state) { switch ( state ) @@ -203,7 +217,7 @@ public class RpcCuratorEvent throw new IllegalStateException("Unknown type: " + eventType); } - private RpcStat toRpcStat(Stat stat) + public static RpcStat toRpcStat(Stat stat) { if ( stat != null ) { http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java index 880f290..add72fa 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -22,20 +22,18 @@ package org.apache.curator.x.rpc.idl.projection; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.Backgroundable; -import org.apache.curator.framework.api.Compressible; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.CreateModable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.*; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.x.rpc.connections.Closer; import org.apache.curator.x.rpc.connections.ConnectionManager; import org.apache.curator.x.rpc.connections.CuratorEntry; +import org.apache.curator.x.rpc.details.RpcBackgroundCallback; +import org.apache.curator.x.rpc.details.RpcWatcher; import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.apache.curator.x.rpc.idl.event.RpcStat; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.UUID; @@ -86,42 +84,93 @@ public class CuratorProjectionService } @ThriftMethod - public String create(final CuratorProjection projection, CreateSpec createSpec) throws Exception + public String create(final CuratorProjection projection, CreateSpec spec) throws Exception { CuratorFramework client = getEntry(projection).getClient(); Object builder = client.create(); - if ( createSpec.creatingParentsIfNeeded ) + if ( spec.creatingParentsIfNeeded ) { builder = castBuilder(builder, CreateBuilder.class).creatingParentsIfNeeded(); } - if ( createSpec.compressed ) + if ( spec.compressed ) { builder = castBuilder(builder, Compressible.class).compressed(); } - if ( createSpec.withProtection ) + if ( spec.withProtection ) { builder = castBuilder(builder, CreateBuilder.class).withProtection(); } - if ( createSpec.mode != null ) + if ( spec.mode != null ) { - builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode)); + builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(spec.mode)); } - if ( createSpec.asyncContext != null ) + if ( spec.asyncContext != null ) { - BackgroundCallback backgroundCallback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - addEvent(projection, new RpcCuratorEvent(event)); - } - }; - builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback, createSpec.asyncContext); + BackgroundCallback backgroundCallback = new RpcBackgroundCallback(this, projection); + builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback, spec.asyncContext); + } + + return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(spec.path, spec.data)); + } + + @ThriftMethod + public byte[] getData(final CuratorProjection projection, GetDataSpec spec) throws Exception + { + CuratorFramework client = getEntry(projection).getClient(); + + Object builder = client.getData(); + if ( spec.watched ) + { + builder = castBuilder(builder, Watchable.class).usingWatcher(new RpcWatcher(this, projection)); + } + + if ( spec.decompressed ) + { + builder = castBuilder(builder, Decompressible.class).decompressed(); + } + + if ( spec.asyncContext != null ) + { + BackgroundCallback backgroundCallback = new RpcBackgroundCallback(this, projection); + builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback); + } + + Stat stat = new Stat(); + builder = castBuilder(builder, Statable.class).storingStatIn(stat); + + return (byte[])castBuilder(builder, Pathable.class).forPath(spec.path); + } + + @ThriftMethod + public RpcStat setData(final CuratorProjection projection, SetDataSpec spec) throws Exception + { + CuratorFramework client = getEntry(projection).getClient(); + + Object builder = client.setData(); + if ( spec.watched ) + { + builder = castBuilder(builder, Watchable.class).usingWatcher(new RpcWatcher(this, projection)); + } + if ( spec.version != null ) + { + builder = castBuilder(builder, Versionable.class).withVersion(spec.version.version); + } + + if ( spec.compressed ) + { + builder = castBuilder(builder, Compressible.class).compressed(); } - return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data)); + if ( spec.asyncContext != null ) + { + BackgroundCallback backgroundCallback = new RpcBackgroundCallback(this, projection); + builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback); + } + + Stat stat = (Stat)castBuilder(builder, PathAndBytesable.class).forPath(spec.path, spec.data); + return RpcCuratorEvent.toRpcStat(stat); } @ThriftMethod @@ -163,7 +212,7 @@ public class CuratorProjectionService return new GenericProjection(id); } - private void addEvent(CuratorProjection projection, RpcCuratorEvent event) + public void addEvent(CuratorProjection projection, RpcCuratorEvent event) { CuratorEntry entry = connectionManager.get(projection.id); if ( entry != null ) @@ -217,4 +266,5 @@ public class CuratorProjectionService } throw new Exception("That operation is not available"); // TODO } + } http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/GetDataSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/GetDataSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/GetDataSpec.java new file mode 100644 index 0000000..fe5b1ac --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/GetDataSpec.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class GetDataSpec +{ + @ThriftField(1) + public String path; + + @ThriftField(2) + public boolean watched; + + @ThriftField(3) + public String asyncContext; + + @ThriftField(4) + public boolean decompressed; + + public GetDataSpec() + { + } + + public GetDataSpec(String path, boolean watched, String asyncContext, boolean decompressed) + { + this.path = path; + this.watched = watched; + this.asyncContext = asyncContext; + this.decompressed = decompressed; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/SetDataSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/SetDataSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/SetDataSpec.java new file mode 100644 index 0000000..195e620 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/SetDataSpec.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class SetDataSpec +{ + @ThriftField(1) + public String path; + + @ThriftField(2) + public boolean watched; + + @ThriftField(3) + public String asyncContext; + + @ThriftField(4) + public boolean compressed; + + @ThriftField(5) + public Version version; + + @ThriftField(6) + public byte[] data; + + public SetDataSpec() + { + } + + public SetDataSpec(String path, boolean watched, String asyncContext, boolean compressed, Version version, byte[] data) + { + this.path = path; + this.watched = watched; + this.asyncContext = asyncContext; + this.compressed = compressed; + this.version = version; + this.data = data; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/Version.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/Version.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/Version.java new file mode 100644 index 0000000..172704b --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/Version.java @@ -0,0 +1,20 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class Version +{ + @ThriftField(1) + public int version; + + public Version() + { + } + + public Version(int version) + { + this.version = version; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index 491753b..dbb3e7c 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -37,6 +37,17 @@ struct GenericProjection { 1: string id; } +struct GetDataSpec { + 1: string path; + 2: bool watched; + 3: string asyncContext; + 4: bool decompressed; +} + +struct Version { + 1: i32 version; +} + struct id { 1: string scheme; 2: string id; @@ -62,6 +73,15 @@ struct WatchedEvent { 3: string path; } +struct SetDataSpec { + 1: string path; + 2: bool watched; + 3: string asyncContext; + 4: bool compressed; + 5: Version version; + 6: binary data; +} + struct Acl { 1: i32 perms; 2: id id; @@ -84,8 +104,10 @@ service CuratorService { GenericProjection acquireLock(1: CuratorProjection projection, 2: string path, 3: i32 maxWaitMs); void closeCuratorProjection(1: CuratorProjection projection); bool closeGenericProjection(1: CuratorProjection curatorProjection, 2: GenericProjection genericProjection); - string create(1: CuratorProjection projection, 2: CreateSpec createSpec); + string create(1: CuratorProjection projection, 2: CreateSpec spec); + binary getData(1: CuratorProjection projection, 2: GetDataSpec spec); CuratorProjection newCuratorProjection(1: string connectionName); + Stat setData(1: CuratorProjection projection, 2: SetDataSpec spec); } service EventService { http://git-wip-us.apache.org/repos/asf/curator/blob/4465fe0f/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEvent.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEvent.java index 8674d26..a5da474 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEvent.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEvent.java @@ -1422,5 +1422,6 @@ public class CuratorEvent implements org.apache.thrift.TBase<CuratorEvent, Curat } } + }