import pandas as pd
import pyarrow as pa
import pyarrow.parquet as apq
import fastparquet as fpq
import datetime
import numpy as np
import tempfile
import os

import logging
logging.basicConfig(
    format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

# aparquet = Arrow's Parquet
# fparquet = fastparquet

def write_aparquet(df, path):
    logging.info(f'Writing parquet to {path} [Arrow]')
    with open(path, 'wb') as fh:
        apq.write_table(pa.Table.from_pandas(df, preserve_index=False), fh, compression='snappy', use_dictionary=False)
    logging.info('Written.')

def write_fparquet(df, path):
    logging.info(f"Writing parquet to {path} [FastParquet]")
    fpq.write(path, df)
    logging.info("Written.")    

def read_aparquet(path):
    logging.info(f"Reading parquet from {path} [Arrow]")
    with open(path, 'rb') as fh:
        _ = apq.read_table(fh).to_pandas()
    logging.info("Read.")  

def read_fparquet(path):
    logging.info(f"Reading parquet from {path} [FastParquet]")
    _ = fpq.ParquetFile(path).to_pandas()
    logging.info("Read.")

def write_arrow(df, path):
    logging.info(f"Writing Arrow to {path}")
    table = pa.Table.from_pandas(df)
    sink = pa.BufferOutputStream()
    writer = pa.ipc.new_stream(sink, table.schema)
    writer.write_table(table)
    writer.close()
    bytes_ = sink.getvalue()
    with open(path, 'wb') as fh:
        fh.write(bytes_)
    logging.info("Arrow written.")

def read_arrow(path):
    logging.info(f"Reading Arrow from {path}")
    with pa.ipc.open_stream(path) as reader:
        _ = reader.read_pandas()
    logging.info("Read.")


if __name__ == '__main__':
    ncols = 1000
    nrows = 10000

    df_tall = pd.DataFrame({str(i): np.random.rand(nrows) for i in range(0, ncols)})
    df_wide = df_tall.T
    df_wide.columns = [str(c) for c in df_wide.columns]

    with tempfile.TemporaryDirectory() as folder:
        write_aparquet(df_tall, os.path.join(folder, 'example_tall_apq.pq'))
        write_aparquet(df_wide, os.path.join(folder, 'example_wide_apq.pq'))
        write_fparquet(df_tall, os.path.join(folder, 'example_tall_fpq.pq'))
        write_fparquet(df_wide, os.path.join(folder, 'example_wide_fpq.pq')) 
        read_aparquet(os.path.join(folder, 'example_tall_apq.pq'))
        read_aparquet(os.path.join(folder, 'example_wide_apq.pq'))        
        read_fparquet(os.path.join(folder, 'example_tall_apq.pq'))
        read_fparquet(os.path.join(folder, 'example_wide_apq.pq')) 
        write_arrow(df_wide, os.path.join(folder, 'example_wide.arrows')) 
        write_arrow(df_tall, os.path.join(folder, 'example_tall.arrows'))    
        read_arrow(os.path.join(folder, 'example_wide.arrows'))
        read_arrow(os.path.join(folder, 'example_tall.arrows'))
