One more follow-up here. The addition of the below statement seems to coax
out an error. Does it appear that the filenames aren't making their way to
the routines that extract the fields from the filenames?
FWIW, this error seems to be coming from here:
https://github.com/apache/arrow/blob/6e3f26af658bfca602e711ea326f1985b62bca1d/cpp/src/arrow/dataset/partition.cc#L511
partitioning = pds.FilenamePartitioning(schema=part_schema).discover(schema=
part_schema)
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning
,
)
# Traceback (most recent call last):
# File "/zip_of_csvs_test.py", line 82, in <module>
# ds_partitioned = pds.dataset(
# File
"/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
line 697, in dataset
# return _filesystem_dataset(source, **kwargs)
# File
"/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
line 449, in _filesystem_dataset
# return factory.finish(schema)
# File "pyarrow/_dataset.pyx", line 1857, in
pyarrow._dataset.DatasetFactory.finish
# File "pyarrow/error.pxi", line 144, in
pyarrow.lib.pyarrow_internal_check_status
# File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: No non-null segments were available for field
'frequency'; couldn't infer type
On Wed, Jul 20, 2022 at 11:19 PM Kirby, Adam <[email protected]> wrote:
> As a follow-up, I can confirm that this appears to work very well for
> non-partitioned data at least.
> In my case, the data are ‘partitioned’ and while the rest of the data are
> parsed properly, the partition fields don’t seem to be being extracted from
> the filenames. Does it appear that I am doing something incorrectly?
>
> Thank you!
>
> —
>
> #!/usr/bin/env python3
> import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport
> pyarrow.dataset as pds
>
> sample_file = (
>
> "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
> )
> schema = pa.schema(
> [
> pa.field("datetime", pa.timestamp("s")),
> pa.field("open", pa.float64()),
> pa.field("high", pa.float64()),
> pa.field("low", pa.float64()),
> pa.field("close", pa.float64()),
> pa.field("volume", pa.float64()),
> ],
> )
> read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
> convert_opts.column_types = schema
> read_opts.column_names = schema.names
> csvformat = pds.CsvFileFormat(convert_options=convert_opts,
> read_options=read_opts)
>
> fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
>
> csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
> print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt',
> 'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt',
> 'AMZN_1min_sample.txt',# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt',
> 'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt',
> 'MSFT_5min_sample.txt']
>
> part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
> partitioning = pds.FilenamePartitioning(schema=part_schema)
> # confirm filenames are parsed correctly
> print({_: str(partitioning.parse(_)) for _ in csv_files})# {#
> "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',#
> "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == "1min"))',#
> "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency ==
> "30min"))',# "AAPL_5min_sample.txt": '((symbol == "AAPL") and (frequency
> == "5min"))',# "AMZN_1hour_sample.txt": '((symbol == "AMZN") and
> (frequency == "1hour"))',# "AMZN_1min_sample.txt": '((symbol == "AMZN")
> and (frequency == "1min"))',# "AMZN_30min_sample.txt": '((symbol ==
> "AMZN") and (frequency == "30min"))',# "AMZN_5min_sample.txt": '((symbol
> == "AMZN") and (frequency == "5min"))',# "MSFT_1hour_sample.txt":
> '((symbol == "MSFT") and (frequency == "1hour"))',#
> "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == "1min"))',#
> "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency == "30min"))',#
> "MSFT_5min_sample.txt": '((symbol == "MSFT") and (frequency ==
> "5min"))',# }
>
> ds_partitioned = pds.dataset(
> csv_files, format=csvformat, filesystem=fsspec_fs,
> partitioning=partitioning,
> )
>
> print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]# open:
> double# high: double# low: double# close: double# volume: double# symbol:
> string# frequency: string# ----# datetime: [[2022-04-01 04:00:00,2022-04-01
> 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# open:
> [[175.25,175.32,175.43,175.54,175.49]]# high:
> [[175.88,175.38,175.72,175.6,175.52]]# low:
> [[175.1,175.04,175.33,174.69,173.35]]# close:
> [[175.26,175.31,175.5,174.82,173.6]]# volume:
> [[24417,13692,90057,162983,736016]]# symbol: [[null,null,null,null,null]]#
> frequency: [[null,null,null,null,null]]
>
>
> On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <[email protected]> wrote:
>
>> Micah, Great idea, thank you! I really appreciate the pointer.
>>
>> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <[email protected]>
>> wrote:
>>
>>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>>
>>> [1]
>>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>>
>>> On Tuesday, July 19, 2022, Kirby, Adam <[email protected]> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>>> implementation to do the streaming ZIP decompression, then
>>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>>
>>>> I would love if there were a way to leverage pyarrow to handle the
>>>> decompression. From what I've seen in examples, a ZIP file containing a
>>>> single CSV is supported -- that is, it's possible to operate on a
>>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>>> compressed stream that contains multiple files?
>>>>
>>>> Thank you in advance!
>>>>
>>>
#!/usr/bin/env python3
import fsspec
import pyarrow as pa
import pyarrow.csv as pcsv
import pyarrow.dataset as pds
sample_file = (
"https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
)
schema = pa.schema(
[
pa.field("datetime", pa.timestamp("s")),
pa.field("open", pa.float64()),
pa.field("high", pa.float64()),
pa.field("low", pa.float64()),
pa.field("close", pa.float64()),
pa.field("volume", pa.float64()),
],
)
read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
convert_opts.column_types = schema
read_opts.column_names = schema.names
csvformat = pds.CsvFileFormat(convert_options=convert_opts, read_options=read_opts)
fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
print(csv_files)
# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', 'AAPL_30min_sample.txt',
# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',
# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', 'MSFT_1hour_sample.txt',
# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']
part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
partitioning = pds.FilenamePartitioning(schema=part_schema)
# confirm filenames are parsed correctly
print({_: str(partitioning.parse(_)) for _ in csv_files})
# {
# "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',
# "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == "1min"))',
# "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency == "30min"))',
# "AAPL_5min_sample.txt": '((symbol == "AAPL") and (frequency == "5min"))',
# "AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency == "1hour"))',
# "AMZN_1min_sample.txt": '((symbol == "AMZN") and (frequency == "1min"))',
# "AMZN_30min_sample.txt": '((symbol == "AMZN") and (frequency == "30min"))',
# "AMZN_5min_sample.txt": '((symbol == "AMZN") and (frequency == "5min"))',
# "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',
# "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == "1min"))',
# "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency == "30min"))',
# "MSFT_5min_sample.txt": '((symbol == "MSFT") and (frequency == "5min"))',
# }
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning,
)
print(ds_partitioned.head(5))
# pyarrow.Table
# datetime: timestamp[s]
# open: double
# high: double
# low: double
# close: double
# volume: double
# symbol: string
# frequency: string
# ----
# datetime: [[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]
# open: [[175.25,175.32,175.43,175.54,175.49]]
# high: [[175.88,175.38,175.72,175.6,175.52]]
# low: [[175.1,175.04,175.33,174.69,173.35]]
# close: [[175.26,175.31,175.5,174.82,173.6]]
# volume: [[24417,13692,90057,162983,736016]]
# symbol: [[null,null,null,null,null]]
# frequency: [[null,null,null,null,null]]
partitioning = pds.FilenamePartitioning(schema=part_schema).discover(schema=part_schema)
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning,
)
# Traceback (most recent call last):
# File "/zip_of_csvs_test.py", line 82, in <module>
# ds_partitioned = pds.dataset(
# File "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py", line 697, in dataset
# return _filesystem_dataset(source, **kwargs)
# File "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py", line 449, in _filesystem_dataset
# return factory.finish(schema)
# File "pyarrow/_dataset.pyx", line 1857, in pyarrow._dataset.DatasetFactory.finish
# File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
# File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: No non-null segments were available for field 'frequency'; couldn't infer type