Motivate

We want to use arrow as a general data serialization framework in
distributed stream data processing. We are working on ray
<https://github.com/ray-project/ray>, written in c++ in low-level and
java/python in high-level. We want to transfer streaming data between
java/python/c++ efficiently. Arrow is a great framework for cross-language
data transfer. But it seems more appropriate for batch columnar data. Is is
appropriate for distributed stream data processing? If not, will there be
more support in stream data processing? Or is there something I miss?
Benchmark
1. if use UnionArray
[image: image.png]
[image: image.png]
[image: image.png]
2. If use RecordBatch, the batch size need to be greater than 50~200 to
have e better deserialization performance than pickle. But the latency
won't be acceptable in streaming.
[image: image.png]

Seems neither is an appropriate way or is there a better way?
Benchmark code

'''    test arrow/pickle performance'''import pickleimport pyarrow as
paimport matplotlib.pyplot as pltimport numpy as npimport timeitimport
datetimeimport copyimport osfrom collections import OrderedDict
dir_path = os.path.dirname(os.path.realpath(__file__))
def benchmark_ser(batches, number=10):
    pickle_results = []
    arrow_results = []
    pickle_sizes = []
    arrow_sizes = []
    for obj_batch in batches:
        pickle_serialize = timeit.timeit(
            lambda: pickle.dumps(obj_batch, protocol=pickle.HIGHEST_PROTOCOL),
            number=number)
        pickle_results.append(pickle_serialize)
        pickle_sizes.append(len(pickle.dumps(obj_batch,
protocol=pickle.HIGHEST_PROTOCOL)))
        arrow_serialize = timeit.timeit(
            lambda: serialize_by_arrow_array(obj_batch), number=number)
        arrow_results.append(arrow_serialize)
        arrow_sizes.append(serialize_by_arrow_array(obj_batch).size)
    return [pickle_results, arrow_results, pickle_sizes, arrow_sizes]
def benchmark_deser(batches, number=10):
    pickle_results = []
    arrow_results = []
    for obj_batch in batches:
        serialized_obj = pickle.dumps(obj_batch, pickle.HIGHEST_PROTOCOL)
        pickle_deserialize = timeit.timeit(lambda: pickle.loads(serialized_obj),
                                        number=number)
        pickle_results.append(pickle_deserialize)
        serialized_obj = serialize_by_arrow_array(obj_batch)
        arrow_deserialize = timeit.timeit(
            lambda: pa.deserialize(serialized_obj), number=number)
        arrow_results.append(arrow_deserialize)
    return [pickle_results, arrow_results]
def serialize_by_arrow_array(obj_batch):
    arrow_arrays = [pa.array(record) if not isinstance(record,
pa.Array) else record for record in obj_batch]
    return pa.serialize(arrow_arrays).to_buffer()


plot_dir = '{}/{}'.format(dir_path,
datetime.datetime.now().strftime('%m%d_%H%M_%S'))if not
os.path.exists(plot_dir):
    os.makedirs(plot_dir)
def plot_time(pickle_times, arrow_times, batch_sizes, title, filename):
    fig, ax = plt.subplots()
    fig.set_size_inches(10, 8)

    bar_width = 0.35
    n_groups = len(batch_sizes)
    index = np.arange(n_groups)
    opacity = 0.6

    plt.bar(index, pickle_times, bar_width,
            alpha=opacity, color='r', label='Pickle')

    plt.bar(index + bar_width, arrow_times, bar_width,
            alpha=opacity, color='c', label='Arrow')

    plt.title(title, fontweight='bold')
    plt.ylabel('Time (seconds)', fontsize=10)
    plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10)
    plt.legend(fontsize=10, bbox_to_anchor=(1, 1))
    plt.tight_layout()
    plt.yticks(fontsize=10)
    plt.savefig(plot_dir + '/plot-' + filename + '.png', format='png')

def plot_size(pickle_sizes, arrow_sizes, batch_sizes, title, filename):
    fig, ax = plt.subplots()
    fig.set_size_inches(10, 8)

    bar_width = 0.35
    n_groups = len(batch_sizes)
    index = np.arange(n_groups)
    opacity = 0.6

    plt.bar(index, pickle_sizes, bar_width,
            alpha=opacity, color='r', label='Pickle')

    plt.bar(index + bar_width, arrow_sizes, bar_width,
            alpha=opacity, color='c', label='Arrow')

    plt.title(title, fontweight='bold')
    plt.ylabel('Space (Byte)', fontsize=10)
    plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10)
    plt.legend(fontsize=10, bbox_to_anchor=(1, 1))
    plt.tight_layout()
    plt.yticks(fontsize=10)
    plt.savefig(plot_dir + '/plot-' + filename + '.png', format='png')
def get_union_obj():
    size = 200
    str_array = pa.array(['str-' + str(i) for i in range(size)])
    int_array = pa.array(np.random.randn(size).tolist())
    types = pa.array([0 for _ in range(size)]+[1 for _ in
range(size)], type=pa.int8())
    offsets = pa.array(list(range(size))+list(range(size)), type=pa.int32())
    union_arr = pa.UnionArray.from_dense(types, offsets, [str_array, int_array])
    return union_arr

test_objects_generater = [
    lambda: np.random.randn(500),
    lambda: np.random.randn(500).tolist(),
    lambda: get_union_obj()
]

titles = [
    'numpy arrays',
    'list of ints',
    'union array of strings and ints'
]
def plot_benchmark():
    batch_sizes = list(OrderedDict.fromkeys(int(i) for i in
np.geomspace(1, 1000, num=25)))
    for i in range(len(test_objects_generater)):
        batches = [[test_objects_generater[i]() for _ in
range(batch_size)] for batch_size in batch_sizes]
        ser_result = benchmark_ser(batches=batches)
        plot_time(*ser_result[0:2], batch_sizes, 'serialization: ' +
titles[i], 'ser_time'+str(i))
        plot_size(*ser_result[2:], batch_sizes, 'serialization byte
size: ' + titles[i], 'ser_size'+str(i))
        deser = benchmark_deser(batches=batches)
        plot_time(*deser, batch_sizes, 'deserialization: ' +
titles[i], 'deser_time-'+str(i))

if __name__ == "__main__":
    plot_benchmark()

Question
So if i want to use arrow  as data serialization framework in distributed
stream data processing, what's the right way?
Since streaming processing is a widespread scenario in data processing,
framework such as flink, spark structural streaming is becoming more and
more popular. Is there a possibility to add special support for streaming
processing in arrow, such that we can also benefit from cross-language and
efficient memory layout.

Reply via email to