[ 
https://issues.apache.org/jira/browse/ARROW-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235388#comment-17235388
 ] 

Lance Dacey commented on ARROW-10517:
-------------------------------------

Latest adlfs (0.5.5):

 

This really creates the test.parquet file as well, not just the directory:
{code:java}
fs.mkdir("dev/test99999999999/2020/01/28/test.parquet", create_parents=True)
{code}
And if I try to run the same line again it it fails because the partition 
exists:
{code:python}
---------------------------------------------------------------------------
StorageErrorException: Operation returned an invalid status 'The specified blob 
already exists.'

During handling of the above exception, another exception occurred:

ResourceExistsError                       Traceback (most recent call last)
/c/airflow/test.py in <module>
----> 6 fs.mkdir("dev/test99999999999/2020/01/28/test.parquet", 
create_parents=True)

~/miniconda3/envs/airflow/lib/python3.8/site-packages/adlfs/spec.py in 
mkdir(self, path, delimiter, exist_ok, **kwargs)
    880 
    881     def mkdir(self, path, delimiter="/", exist_ok=False, **kwargs):
--> 882         maybe_sync(self._mkdir, self, path, delimiter, exist_ok)
    883 
    884     async def _mkdir(self, path, delimiter="/", exist_ok=False, 
**kwargs):

~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in 
maybe_sync(func, self, *args, **kwargs)
     98         if inspect.iscoroutinefunction(func):
     99             # run the awaitable on the loop
--> 100             return sync(loop, func, *args, **kwargs)
    101         else:
    102             # just call the blocking function

~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in 
sync(loop, func, callback_timeout, *args, **kwargs)
     69     if error[0]:
     70         typ, exc, tb = error[0]
---> 71         raise exc.with_traceback(tb)
     72     else:
     73         return result[0]

~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in f()
     53             if callback_timeout is not None:
     54                 future = asyncio.wait_for(future, callback_timeout)
---> 55             result[0] = await future
     56         except Exception:
     57             error[0] = sys.exc_info()

~/miniconda3/envs/airflow/lib/python3.8/site-packages/adlfs/spec.py in 
_mkdir(self, path, delimiter, exist_ok, **kwargs)
    918                     container=container_name
    919                 )
--> 920                 await container_client.upload_blob(name=path, data="")
    921             else:
    922                 ## everything else

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py
 in wrapper_use_tracer(*args, **kwargs)
     72             span_impl_type = settings.tracing_implementation()
     73             if span_impl_type is None:
---> 74                 return await func(*args, **kwargs)
     75 
     76             # Merge span is parameter is set, but only if no explicit 
parent are passed

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_container_client_async.py
 in upload_blob(self, name, data, blob_type, length, metadata, **kwargs)
    715         timeout = kwargs.pop('timeout', None)
    716         encoding = kwargs.pop('encoding', 'UTF-8')
--> 717         await blob.upload_blob(
    718             data,
    719             blob_type=blob_type,

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py
 in wrapper_use_tracer(*args, **kwargs)
     72             span_impl_type = settings.tracing_implementation()
     73             if span_impl_type is None:
---> 74                 return await func(*args, **kwargs)
     75 
     76             # Merge span is parameter is set, but only if no explicit 
parent are passed

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py
 in upload_blob(self, data, blob_type, length, metadata, **kwargs)
    267             **kwargs)
    268         if blob_type == BlobType.BlockBlob:
--> 269             return await upload_block_blob(**options)
    270         if blob_type == BlobType.PageBlob:
    271             return await upload_page_blob(**options)

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_upload_helpers.py
 in upload_block_blob(client, data, stream, length, overwrite, headers, 
validate_content, max_concurrency, blob_settings, encryption_options, **kwargs)
    131     except StorageErrorException as error:
    132         try:
--> 133             process_storage_error(error)
    134         except ResourceModifiedError as mod_error:
    135             if not overwrite:

~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py
 in process_storage_error(storage_error)
    145     error.error_code = error_code
    146     error.additional_info = additional_data
--> 147     raise error
    148 
    149 

ResourceExistsError: The specified blob already exists.
RequestId:85acda2e-401e-0080-5166-be4d32000000
Time:2020-11-19T11:23:28.7193393Z
ErrorCode:BlobAlreadyExists
Error:None
{code}
If I switch to adlfs 0.2.5 (old version which works for ds.dataset()), there 
error is different when I try to create a directory which already exists but I 
also cannot create any directory at all for some reason. I also tried to create 
an entirely new directory which definitely does not exist and ran into an error:
{code:python}
fs.mkdir("dev/testab1234123/2020/01/28/new.parquet", create_parents=True)
RuntimeError: Cannot create dev/testab1234123/2020/01/28/new.parquet.


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)

----> 6 fs.mkdir("dev/test99999999999/2020/01/28/test.parquet", 
create_parents=True)

~/miniconda3/envs/old/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, 
path, delimiter, exists_ok, **kwargs)
    561             else:
    562                 ## everything else
--> 563                 raise RuntimeError(f"Cannot create 
{container_name}{delimiter}{path}.")
    564         else:
    565             if container_name in self.ls("") and path:

RuntimeError: Cannot create dev/test99999999999/2020/01/28/test.parquet.
{code}
But I *am* able to read a dataset which I could *not* do with adlfs 0.5.5 (I 
get that error about a list of files instead of a dictionary using fs.find() 
with the latest version).

So this is bizarre. I can only read data (with ds.dataset()) with an old 
version of adlfs, and I can only write data with the newest version.

Even pq.read_table() will not work for me using the latest version of adlfs 
(0.5.5):
{code:python}
----> 7 table = pq.read_table(source="dev/testing10/evaluations", columns=None, 
filters=[('year', '==', '2020')], filesystem=fs)

~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/parquet.py in 
read_table(source, columns, use_threads, metadata, use_pandas_metadata, 
memory_map, read_dictionary, filesystem, filters, buffer_size, partitioning, 
use_legacy_dataset, ignore_prefixes)
   1605             )
   1606         try:
-> 1607             dataset = _ParquetDatasetV2(
   1608                 source,
   1609                 filesystem=filesystem,

~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/parquet.py in 
__init__(self, path_or_paths, filesystem, filters, partitioning, 
read_dictionary, buffer_size, memory_map, ignore_prefixes, **kwargs)
   1465                 infer_dictionary=True)
   1466 
-> 1467         self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
   1468                                    format=parquet_format,
   1469                                    partitioning=partitioning,

~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in 
dataset(source, schema, format, filesystem, partitioning, partition_base_dir, 
exclude_invalid_files, ignore_prefixes)
    669     # TODO(kszucs): support InMemoryDataset for a table input
    670     if _is_path_like(source):
--> 671         return _filesystem_dataset(source, **kwargs)
    672     elif isinstance(source, (tuple, list)):
    673         if all(_is_path_like(elem) for elem in source):

~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in 
_filesystem_dataset(source, schema, filesystem, partitioning, format, 
partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
    426         fs, paths_or_selector = _ensure_multiple_sources(source, 
filesystem)
    427     else:
--> 428         fs, paths_or_selector = _ensure_single_source(source, 
filesystem)
    429 
    430     options = FileSystemFactoryOptions(

~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in 
_ensure_single_source(path, filesystem)
    402         paths_or_selector = [path]
    403     else:
--> 404         raise FileNotFoundError(path)
    405 
    406     return filesystem, paths_or_selector

FileNotFoundError: dev/testing10/evaluations
{code}
If I turn on use_legacy_dataset=True, then it works though and I am able to use 
the write_dataset feature. So this is definitely some interaction between the 
new dataset module and adlfs.
{code:python}
table = pq.read_table(source="dev/testing10/evaluations", columns=None, 
filters=[('year', '==', '2020')], filesystem=fs, use_legacy_dataset=True)
ds.write_dataset(table, 
                 base_dir="dev/adlfs-0.5.5", 
                 format="parquet", 
                 partitioning=ds.DirectoryPartitioning(pa.schema([("year", 
pa.int64()), ("month", pa.string()), ("day", pa.string())])),
                 schema=table.schema, 
                 filesystem=fs)
{code}
This does seem to create empty files for each partition as well, which is 
strange, and the files are named with a part- prefix now instead of a UUID.

dev/adlfs-0.5.5/2020/11/15/part-2.parquet
 dev/adlfs-0.5.5/2020/11/16/part-3.parquet

!ss2.PNG!

> [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
> ------------------------------------------------------------------------
>
>                 Key: ARROW-10517
>                 URL: https://issues.apache.org/jira/browse/ARROW-10517
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 2.0.0
>         Environment: Ubuntu 18.04
>            Reporter: Lance Dacey
>            Priority: Major
>              Labels: azureblob, dataset, dataset-parquet-read, 
> dataset-parquet-write, fsspec
>         Attachments: ss.PNG, ss2.PNG
>
>
>  
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==2.0.0
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> import pyarrow.dataset as ds
> import fsspec
> from pyarrow.dataset import DirectoryPartitioning
> fs = fsspec.filesystem(protocol='abfs', 
>                        account_name=base.login, 
>                        account_key=base.password)
> ds.write_dataset(data=table, 
>                  base_dir="dev/test7", 
>                  basename_template=None, 
>                  format="parquet",
>                  partitioning=DirectoryPartitioning(pa.schema([("year", 
> pa.string()), ("month", pa.string()), ("day", pa.string())])), 
>                  schema=table.schema,
>                  filesystem=fs, 
>                 )
> {code}
>  I think this is due to early versions of adlfs having mkdir(). Although I 
> use write_to_dataset and write_table all of the time, so I am not sure why 
> this would be an issue.
> {code:python}
> ---------------------------------------------------------------------------
> RuntimeError                              Traceback (most recent call last)
> <ipython-input-40-bb38d83f896e> in <module>
>      13 
>      14 
> ---> 15 ds.write_dataset(data=table, 
>      16                  base_dir="dev/test7",
>      17                  basename_template=None,
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> write_dataset(data, base_dir, basename_template, format, partitioning, 
> schema, filesystem, file_options, use_threads)
>     771     filesystem, _ = _ensure_fs(filesystem)
>     772 
> --> 773     _filesystemdataset_write(
>     774         data, base_dir, basename_template, schema,
>     775         filesystem, partitioning, file_options, use_threads,
> /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in 
> pyarrow._dataset._filesystemdataset_write()
> /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in 
> pyarrow._fs._cb_create_dir()
> /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self, 
> path, recursive)
>     226     def create_dir(self, path, recursive):
>     227         # mkdir also raises FileNotFoundError when base directory is 
> not found
> --> 228         self.fs.mkdir(path, create_parents=recursive)
>     229 
>     230     def delete_dir(self, path):
> /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path, 
> delimiter, exists_ok, **kwargs)
>     561             else:
>     562                 ## everything else
> --> 563                 raise RuntimeError(f"Cannot create 
> {container_name}{delimiter}{path}.")
>     564         else:
>     565             if container_name in self.ls("") and path:
> RuntimeError: Cannot create dev/test7/2020/01/28.
> {code}
>  
> Next, if I try to read a dataset (keep in mind that this works with 
> read_table and ParquetDataset):
> {code:python}
> ds.dataset(source="dev/staging/evaluations", 
>            format="parquet", 
>            partitioning="hive",
>            exclude_invalid_files=False,
>            filesystem=fs
>           )
> {code}
>  
> This doesn't seem to respect the filesystem connected to Azure Blob.
> {code:python}
> ---------------------------------------------------------------------------
> FileNotFoundError                         Traceback (most recent call last)
> <ipython-input-41-4de65fe95db7> in <module>
> ----> 1 ds.dataset(source="dev/staging/evaluations", 
>       2            format="parquet",
>       3            partitioning="hive",
>       4            exclude_invalid_files=False,
>       5            filesystem=fs
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, 
> schema, format, filesystem, partitioning, partition_base_dir, 
> exclude_invalid_files, ignore_prefixes)
>     669     # TODO(kszucs): support InMemoryDataset for a table input
>     670     if _is_path_like(source):
> --> 671         return _filesystem_dataset(source, **kwargs)
>     672     elif isinstance(source, (tuple, list)):
>     673         if all(_is_path_like(elem) for elem in source):
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> _filesystem_dataset(source, schema, filesystem, partitioning, format, 
> partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
>     426         fs, paths_or_selector = _ensure_multiple_sources(source, 
> filesystem)
>     427     else:
> --> 428         fs, paths_or_selector = _ensure_single_source(source, 
> filesystem)
>     429 
>     430     options = FileSystemFactoryOptions(
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> _ensure_single_source(path, filesystem)
>     402         paths_or_selector = [path]
>     403     else:
> --> 404         raise FileNotFoundError(path)
>     405 
>     406     return filesystem, paths_or_selector
> FileNotFoundError: dev/staging/evaluations
> {code}
> This *does* work though when I list the blobs before passing them to 
> ds.dataset:
> {code:python}
> blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
> dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs], 
>                      format="parquet", 
>                      partitioning="hive",
>                      exclude_invalid_files=False,
>                      filesystem=fs)
> {code}
> Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there 
> is no write_datasets):
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==1.0.1
> dataset = ds.dataset("dev/staging/evaluations", format="parquet", 
> filesystem=fs)
> dataset.to_table().to_pandas()
> {code}
> edit: 
> pyarrow 2.0.0
> fsspec 0.8.4
> adlfs v0.5.5
> pandas 1.1.4
> numpy 1.19.4
> azure.storage.blob 12.6.0
> {code:python}
> x = adlfs.AzureBlobFileSystem(account_name=name, account_key=key)
> type(x.find("dev/test", detail=True))
> list
> fs = fsspec.filesystem(protocol="abfs", account_name=name, account_key=key)
> type(fs.find("dev/test", detail=True))
> list
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to