Motivate We want to use arrow as a general data serialization framework in distributed stream data processing. We are working on ray <https://github.com/ray-project/ray>, written in c++ in low-level and java/python in high-level. We want to transfer streaming data between java/python/c++ efficiently. Arrow is a great framework for cross-language data transfer. But it seems more appropriate for batch columnar data. Is is appropriate for distributed stream data processing? If not, will there be more support in stream data processing? Or is there something I miss? Benchmark 1. if use UnionArray [image: image.png] [image: image.png] [image: image.png] 2. If use RecordBatch, the batch size need to be greater than 50~200 to have e better deserialization performance than pickle. But the latency won't be acceptable in streaming. [image: image.png]
Seems neither is an appropriate way or is there a better way? Benchmark code ''' test arrow/pickle performance'''import pickleimport pyarrow as paimport matplotlib.pyplot as pltimport numpy as npimport timeitimport datetimeimport copyimport osfrom collections import OrderedDict dir_path = os.path.dirname(os.path.realpath(__file__)) def benchmark_ser(batches, number=10): pickle_results = [] arrow_results = [] pickle_sizes = [] arrow_sizes = [] for obj_batch in batches: pickle_serialize = timeit.timeit( lambda: pickle.dumps(obj_batch, protocol=pickle.HIGHEST_PROTOCOL), number=number) pickle_results.append(pickle_serialize) pickle_sizes.append(len(pickle.dumps(obj_batch, protocol=pickle.HIGHEST_PROTOCOL))) arrow_serialize = timeit.timeit( lambda: serialize_by_arrow_array(obj_batch), number=number) arrow_results.append(arrow_serialize) arrow_sizes.append(serialize_by_arrow_array(obj_batch).size) return [pickle_results, arrow_results, pickle_sizes, arrow_sizes] def benchmark_deser(batches, number=10): pickle_results = [] arrow_results = [] for obj_batch in batches: serialized_obj = pickle.dumps(obj_batch, pickle.HIGHEST_PROTOCOL) pickle_deserialize = timeit.timeit(lambda: pickle.loads(serialized_obj), number=number) pickle_results.append(pickle_deserialize) serialized_obj = serialize_by_arrow_array(obj_batch) arrow_deserialize = timeit.timeit( lambda: pa.deserialize(serialized_obj), number=number) arrow_results.append(arrow_deserialize) return [pickle_results, arrow_results] def serialize_by_arrow_array(obj_batch): arrow_arrays = [pa.array(record) if not isinstance(record, pa.Array) else record for record in obj_batch] return pa.serialize(arrow_arrays).to_buffer() plot_dir = '{}/{}'.format(dir_path, datetime.datetime.now().strftime('%m%d_%H%M_%S'))if not os.path.exists(plot_dir): os.makedirs(plot_dir) def plot_time(pickle_times, arrow_times, batch_sizes, title, filename): fig, ax = plt.subplots() fig.set_size_inches(10, 8) bar_width = 0.35 n_groups = len(batch_sizes) index = np.arange(n_groups) opacity = 0.6 plt.bar(index, pickle_times, bar_width, alpha=opacity, color='r', label='Pickle') plt.bar(index + bar_width, arrow_times, bar_width, alpha=opacity, color='c', label='Arrow') plt.title(title, fontweight='bold') plt.ylabel('Time (seconds)', fontsize=10) plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10) plt.legend(fontsize=10, bbox_to_anchor=(1, 1)) plt.tight_layout() plt.yticks(fontsize=10) plt.savefig(plot_dir + '/plot-' + filename + '.png', format='png') def plot_size(pickle_sizes, arrow_sizes, batch_sizes, title, filename): fig, ax = plt.subplots() fig.set_size_inches(10, 8) bar_width = 0.35 n_groups = len(batch_sizes) index = np.arange(n_groups) opacity = 0.6 plt.bar(index, pickle_sizes, bar_width, alpha=opacity, color='r', label='Pickle') plt.bar(index + bar_width, arrow_sizes, bar_width, alpha=opacity, color='c', label='Arrow') plt.title(title, fontweight='bold') plt.ylabel('Space (Byte)', fontsize=10) plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10) plt.legend(fontsize=10, bbox_to_anchor=(1, 1)) plt.tight_layout() plt.yticks(fontsize=10) plt.savefig(plot_dir + '/plot-' + filename + '.png', format='png') def get_union_obj(): size = 200 str_array = pa.array(['str-' + str(i) for i in range(size)]) int_array = pa.array(np.random.randn(size).tolist()) types = pa.array([0 for _ in range(size)]+[1 for _ in range(size)], type=pa.int8()) offsets = pa.array(list(range(size))+list(range(size)), type=pa.int32()) union_arr = pa.UnionArray.from_dense(types, offsets, [str_array, int_array]) return union_arr test_objects_generater = [ lambda: np.random.randn(500), lambda: np.random.randn(500).tolist(), lambda: get_union_obj() ] titles = [ 'numpy arrays', 'list of ints', 'union array of strings and ints' ] def plot_benchmark(): batch_sizes = list(OrderedDict.fromkeys(int(i) for i in np.geomspace(1, 1000, num=25))) for i in range(len(test_objects_generater)): batches = [[test_objects_generater[i]() for _ in range(batch_size)] for batch_size in batch_sizes] ser_result = benchmark_ser(batches=batches) plot_time(*ser_result[0:2], batch_sizes, 'serialization: ' + titles[i], 'ser_time'+str(i)) plot_size(*ser_result[2:], batch_sizes, 'serialization byte size: ' + titles[i], 'ser_size'+str(i)) deser = benchmark_deser(batches=batches) plot_time(*deser, batch_sizes, 'deserialization: ' + titles[i], 'deser_time-'+str(i)) if __name__ == "__main__": plot_benchmark() Question So if i want to use arrow as data serialization framework in distributed stream data processing, what's the right way? Since streaming processing is a widespread scenario in data processing, framework such as flink, spark structural streaming is becoming more and more popular. Is there a possibility to add special support for streaming processing in arrow, such that we can also benefit from cross-language and efficient memory layout.