Author: chirino
Date: Fri Dec 31 01:21:08 2010
New Revision: 1054036
URL: http://svn.apache.org/viewvc?rev=1054036&view=rev
Log:
initial pass at implementing store import/export.
Added:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Dec 31 01:21:08 2010
@@ -25,6 +25,8 @@ import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import com.sleepycat.je._
+import java.io.{EOFException, InputStream, OutputStream}
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
object BDBClient extends Log
/**
@@ -384,4 +386,102 @@ class BDBClient(store: BDBStore) extends
}
}
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+ try {
+ with_ctx { ctx=>
+ import ctx._
+ import PBSupport._
+
+ streams.using_queue_stream { queue_stream =>
+ queues_db.cursor(tx) { (_, value) =>
+ val record:QueueRecord = value
+ record.writeFramed(queue_stream)
+ true
+ }
+ }
+
+ streams.using_message_stream { message_stream=>
+ messages_db.cursor(tx) { (_, value) =>
+ val record:MessageRecord = value
+ record.writeFramed(message_stream)
+ true
+ }
+ }
+
+ streams.using_queue_entry_stream { queue_entry_stream=>
+ queues_db.cursor(tx) { (_, value) =>
+ val record:QueueRecord = value
+ with_entries_db(record.key) { entries_db=>
+ entries_db.cursor(tx) { (key, value) =>
+ val record:QueueEntryRecord = value
+ record.writeFramed(queue_entry_stream)
+ true
+ }
+ }
+ true
+ }
+ }
+
+ }
+ Success(Zilch)
+ } catch {
+ case x:Exception=>
+ Failure(x.getMessage)
+ }
+ }
+
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+ try {
+ purge
+
+ def foreach[Buffer] (stream:InputStream,
fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
+ var done = false
+ do {
+ try {
+ func(fact.parseFramed(stream).asInstanceOf[Buffer])
+ } catch {
+ case x:EOFException =>
+ done = true
+ }
+ } while( !done )
+ }
+
+ with_ctx { ctx=>
+ import ctx._
+ import PBSupport._
+
+ streams.using_queue_stream { queue_stream=>
+ foreach(queue_stream, QueuePB.FACTORY) { pb=>
+ val record:QueueRecord = pb
+ queues_db.put(tx, record.key, record)
+ with_entries_db(record.key) { entriesdb=>
+ }
+ }
+ }
+
+ streams.using_message_stream { message_stream=>
+ foreach(message_stream, MessagePB.FACTORY) { pb=>
+ val record:MessageRecord = pb
+ messages_db.put(tx, record.key, record)
+ }
+ }
+
+ streams.using_queue_entry_stream { queue_entry_stream=>
+ foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+ val record:QueueEntryRecord = pb
+
+ with_entries_db(record.queue_key) { entries_db=>
+ entries_db.put(tx, record.entry_seq, record)
+ add_and_get(message_refs_db, record.message_key, 1, tx)
+ }
+ }
+ }
+ }
+ Success(Zilch)
+
+ } catch {
+ case x:Exception=>
+ Failure(x.getMessage)
+ }
+ }
}
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Dec 31 01:21:08 2010
@@ -20,7 +20,6 @@ import dto.{BDBStoreDTO, BDBStoreStatusD
import java.util.concurrent.atomic.AtomicLong
import collection.Seq
import org.fusesource.hawtdispatch._
-import java.io.File
import java.util.concurrent._
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
@@ -28,6 +27,8 @@ import ReporterLevel._
import org.fusesource.hawtdispatch.ListEventAggregator
import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO,
TimeMetricDTO, StoreDTO}
import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream, File}
+import scala.util.continuations._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -268,4 +269,21 @@ class BDBStore extends DelayingStoreSupp
callback(rc)
}
+
+ /**
+ * Exports the contents of the store to the provided streams. Each stream
should contain
+ * a list of framed protobuf objects with the corresponding object types.
+ */
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String]
@suspendable = write_executor ! {
+ client.export_pb(streams)
+ }
+
+ /**
+ * Imports a previously exported set of streams. This deletes any previous
data
+ * in the store.
+ */
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String]
@suspendable = write_executor ! {
+ client.import_pb(streams)
+ }
+
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Dec 31 01:21:08 2010
@@ -18,6 +18,14 @@ package org.apache.activemq.apollo.broke
*/
import org.apache.activemq.apollo.dto.{StoreStatusDTO, StoreDTO}
import org.apache.activemq.apollo.util._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
+
+trait StreamManager[A] {
+ def using_queue_stream(func: (A)=>Unit)
+ def using_message_stream(func: (A)=>Unit)
+ def using_queue_entry_stream(func: (A)=>Unit)
+}
/**
* <p>
@@ -109,4 +117,15 @@ trait Store extends ServiceTrait {
*/
def load_message(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
+ /**
+ * Exports the contents of the store to the provided streams. Each stream
should contain
+ * a list of framed protobuf objects with the corresponding object types.
+ */
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String]
@suspendable
+
+ /**
+ * Imports a previously exported set of streams. This deletes any previous
data
+ * in the store.
+ */
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String]
@suspendable
}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
Fri Dec 31 01:21:08 2010
@@ -32,6 +32,8 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import ReporterLevel._
import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
object CassandraStore extends Log {
@@ -229,5 +231,19 @@ class CassandraStore extends DelayingSto
}
}
+ /**
+ * Exports the contents of the store to the provided streams. Each stream
should contain
+ * a list of framed protobuf objects with the corresponding object types.
+ */
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String]
@suspendable = blocking ! {
+ Failure("not supported")// client.export_pb(queue_stream, message_stream,
queue_entry_stream)
+ }
+ /**
+ * Imports a previously exported set of streams. This deletes any previous
data
+ * in the store.
+ */
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String]
@suspendable = blocking ! {
+ Failure("not supported")//client.import_pb(queue_stream, message_stream,
queue_entry_stream)
+ }
}
Added:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1054036&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Dec 31 01:21:08 2010
@@ -0,0 +1,115 @@
+package org.apache.activemq.apollo.cli.commands
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.felix.gogo.commands.{Action, Option => option, Argument =>
argument, Command => command}
+import org.osgi.service.command.CommandSession
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.OptionSupport._
+import org.apache.commons.codec.binary.Base64
+import java.net.{HttpURLConnection, URL}
+import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
+import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
+import org.apache.activemq.apollo.util.{ServiceControl, Log,
DirectBufferPoolFactory, Logging}
+import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
+import java.io.{OutputStream, FileOutputStream, File}
+
+object StoreExport extends Log
+
+/**
+ * The apollo stop command
+ */
+...@command(scope="apollo", name = "store-export", description = "exports the
contents of a broker message store")
+class StoreExport extends Action {
+
+ @option(name = "--conf", description = "The Apollo configuration file.")
+ var conf: File = _
+
+ @option(name = "--virtual-host", description = "The id of the virtual host
to export, if not specified, the default virtual host is selected.")
+ var host: String = _
+
+ @argument(name = "dest", description = "The destination file to hold the
exported data", index=0, required=true)
+ var dest:File = _
+
+ def execute(session: CommandSession):AnyRef = {
+ import Helper._
+
+ try {
+
+ val base = system_dir("apollo.base")
+
+ if( conf == null ) {
+ conf = base / "etc" / "apollo.xml"
+ }
+
+ if( !conf.exists ) {
+ error("Configuration file'%s' does not exist.\n\nTry creating a broker
instance using the 'apollo create' command.".format(conf));
+ }
+
+ val config_store = new FileConfigStore
+ config_store.file = conf
+ config_store.start
+ val config = config_store.load(true)
+
+ val hosts =
collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
+ val vho:Option[VirtualHostDTO] = if( host==null ) {
+ hosts.headOption
+ } else {
+ hosts.filter( _.id == host ).headOption
+ }
+
+ val vh = vho.getOrElse(error("Could find host to export"))
+ if( vh.store == null ) {
+ error("The virtual host '%s' does not have a store
configured.".format(vh.id))
+ }
+
+ val store = StoreFactory.create(vh.store)
+ if( store==null ) {
+ error("Could not create the store.")
+ }
+
+ store.configure(config.store, LoggingReporter(StoreExport))
+ ServiceControl.start(store, "store startup")
+
+
+ using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
+ out.setMethod(ZipEntry.DEFLATED)
+ out.setLevel(9)
+ store.export_pb(new StreamManager[OutputStream]() {
+ def entry(name:String, func: (OutputStream) => Unit) = {
+ out.putNextEntry(new ZipEntry(name));
+ func(out)
+ out.closeEntry();
+ }
+ def using_queue_stream(func: (OutputStream) => Unit) =
entry("queues.dat", func)
+ def using_queue_entry_stream(func: (OutputStream) => Unit) =
entry("queue_entries.dat", func)
+ def using_message_stream(func: (OutputStream) => Unit) =
entry("messages.dat", func)
+ })
+ }
+
+ ServiceControl.stop(store, "store stop");
+
+ } catch {
+ case x:Failure=>
+ error(x.getMessage)
+ }
+ null
+ }
+
+
+}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Dec 31 01:21:08 2010
@@ -30,6 +30,8 @@ import org.apache.activemq.apollo.util._
import ReporterLevel._
import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained,
ListEventAggregator}
import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -286,4 +288,20 @@ class HawtDBStore extends DelayingStoreS
callback(rc)
}
+
+ /**
+ * Exports the contents of the store to the provided streams. Each stream
should contain
+ * a list of framed protobuf objects with the corresponding object types.
+ */
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String]
@suspendable = executor_pool ! {
+ Failure("not supported")// client.export_pb(queue_stream, message_stream,
queue_entry_stream)
+ }
+
+ /**
+ * Imports a previously exported set of streams. This deletes any previous
data
+ * in the store.
+ */
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String]
@suspendable = executor_pool ! {
+ Failure("not supported")//client.import_pb(queue_stream, message_stream,
queue_entry_stream)
+ }
}
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Dec 31 01:21:08 2010
@@ -27,9 +27,10 @@ import jdbm._
import btree.BTree
import htree.HTree
import java.util.Comparator
-import java.io.Serializable
import jdbm.helper._
import PBSupport._
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+import java.io.{EOFException, InputStream, OutputStream, Serializable}
object JDBM2Client extends Log {
@@ -408,4 +409,106 @@ class JDBM2Client(store: JDBM2Store) {
def getLastQueueKey:Long = last_queue_key
+
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+ try {
+ import PBSupport._
+
+ streams.using_queue_stream { queue_stream=>
+ queues_db.cursor { (_, value) =>
+ val record:QueueRecord = value
+ record.writeFramed(queue_stream)
+ true
+ }
+ }
+ streams.using_message_stream { message_stream=>
+ messages_db.cursor { (_, value) =>
+ val record:MessageRecord = value
+ record.writeFramed(message_stream)
+ true
+ }
+ }
+
+ streams.using_queue_entry_stream { queue_entry_stream=>
+ entries_db.cursor { (_, value) =>
+ val record:QueueEntryRecord = value
+ record.writeFramed(queue_entry_stream)
+ true
+ }
+ }
+ Success(Zilch)
+
+ } catch {
+ case x:Exception=>
+ Failure(x.getMessage)
+ }
+ }
+
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+ try {
+ purge
+
+ var size =0
+ def check_flush(incr:Int, max:Int) = {
+ size += incr
+ if( size > max ) {
+ recman.commit
+ size = 0
+ }
+ }
+
+ transaction {
+
+ def foreach[Buffer] (stream:InputStream,
fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
+ var done = false
+ do {
+ try {
+ func(fact.parseFramed(stream).asInstanceOf[Buffer])
+ } catch {
+ case x:EOFException =>
+ done = true
+ }
+ } while( !done )
+ }
+
+
+ import PBSupport._
+
+ streams.using_queue_stream { queue_stream=>
+ foreach(queue_stream, QueuePB.FACTORY) { pb=>
+ val record:QueueRecord = pb
+ queues_db.put(record.key, record)
+ check_flush(1, 10000)
+ }
+ }
+
+ recman.commit
+
+ streams.using_message_stream { message_stream=>
+ foreach(message_stream, MessagePB.FACTORY) { pb=>
+ val record:MessageRecord = pb
+ messages_db.put(record.key, record)
+ check_flush(record.size, 1024*124*10)
+ }
+ }
+
+ recman.commit
+
+ streams.using_queue_entry_stream { queue_entry_stream=>
+ foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+ val record:QueueEntryRecord = pb
+ entries_db.insert((record.queue_key, record.entry_seq), record,
true)
+ add_message_reference(record.message_key)
+ check_flush(1, 10000)
+ }
+ }
+
+ }
+ Success(Zilch)
+
+ } catch {
+ case x:Exception=>
+ Failure(x.getMessage)
+ }
+ }
}
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Dec 31 01:21:08 2010
@@ -28,6 +28,7 @@ import ReporterLevel._
import org.fusesource.hawtdispatch.ListEventAggregator
import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO,
TimeMetricDTO, StoreDTO}
import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
import scala.util.continuations._
/**
@@ -287,4 +288,20 @@ class JDBM2Store extends DelayingStoreSu
callback(rc)
}
+
+ /**
+ * Exports the contents of the store to the provided streams. Each stream
should contain
+ * a list of framed protobuf objects with the corresponding object types.
+ */
+ def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String]
@suspendable = executor ! {
+ client.export_pb(streams)
+ }
+
+ /**
+ * Imports a previously exported set of streams. This deletes any previous
data
+ * in the store.
+ */
+ def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String]
@suspendable = executor ! {
+ client.import_pb(streams)
+ }
}