Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r199508609 --- Diff: python/pyspark/serializers.py --- @@ -184,27 +184,59 @@ def loads(self, obj): raise NotImplementedError -class ArrowSerializer(FramedSerializer): +class BatchOrderSerializer(Serializer): """ - Serializes bytes as Arrow data with the Arrow file format. + Deserialize a stream of batches followed by batch order information. """ - def dumps(self, batch): + def __init__(self, serializer): + self.serializer = serializer + self.batch_order = [] + + def dump_stream(self, iterator, stream): + return self.serializer.dump_stream(iterator, stream) + + def load_stream(self, stream): + for batch in self.serializer.load_stream(stream): + yield batch + num = read_int(stream) + for i in xrange(num): + index = read_int(stream) + self.batch_order.append(index) + raise StopIteration() + + def get_batch_order(self): --- End diff -- maybe we should initialize `self.batch_order = None`, and add `assert self.batch_order is not None` here.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org