Script 'mail_helper' called by obssrc
Hello community,
here is the log from the commit of package python-dask-expr for
openSUSE:Factory checked in at 2024-12-06 14:27:28
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-dask-expr (Old)
and /work/SRC/openSUSE:Factory/.python-dask-expr.new.28523 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-dask-expr"
Fri Dec 6 14:27:28 2024 rev:8 rq:1228730 version:1.1.20
Changes:
--------
--- /work/SRC/openSUSE:Factory/python-dask-expr/python-dask-expr.changes
2024-11-27 22:12:41.235848776 +0100
+++
/work/SRC/openSUSE:Factory/.python-dask-expr.new.28523/python-dask-expr.changes
2024-12-06 14:27:54.032317295 +0100
@@ -1,0 +2,20 @@
+Thu Dec 5 21:53:04 UTC 2024 - Ben Greiner <[email protected]>
+
+- Update to 1.1.20
+ * Fix value_counts with split_out != 1 (#1170) Patrick Hoefler
+ * Remove recursion in task spec (#1158) Florian Jetter
+ * Deprecated and remove from_legacy_dataframe usage (#1168)
+ Patrick Hoefler
+ * Remove from_dask_dataframe (#1167) Patrick Hoefler
+ * Avoid exponentially growing graph for Assign-Projection
+ combinations (#1164) Patrick Hoefler
+ * Introduce more caching when walking the expression (#1165)
+ Patrick Hoefler
+ * Use Taskspec fuse implementation (#1162) Florian Jetter
+ * Fix orphaned dependencies in Fused expression (#1163) Patrick
+ Hoefler
+- Add dask-expr-pr1173-blockwise.patch
+ * Use new blockwise unpack collection in array
+ * gh#dask/dask-expr#1173
+
+-------------------------------------------------------------------
Old:
----
dask_expr-1.1.19-gh.tar.gz
New:
----
dask-expr-pr1173-blockwise.patch
dask_expr-1.1.20-gh.tar.gz
BETA DEBUG BEGIN:
New: Hoefler
- Add dask-expr-pr1173-blockwise.patch
* Use new blockwise unpack collection in array
BETA DEBUG END:
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-dask-expr.spec ++++++
--- /var/tmp/diff_new_pack.NWHnRb/_old 2024-12-06 14:27:54.576340207 +0100
+++ /var/tmp/diff_new_pack.NWHnRb/_new 2024-12-06 14:27:54.580340376 +0100
@@ -26,12 +26,14 @@
%bcond_with test
%endif
Name: python-dask-expr%{psuffix}
-Version: 1.1.19
+Version: 1.1.20
Release: 0
Summary: High Level Expressions for Dask
License: BSD-3-Clause
URL: https://github.com/dask/dask-expr
Source0:
https://github.com/dask/dask-expr/archive/refs/tags/v%{version}.tar.gz#/dask_expr-%{version}-gh.tar.gz
+# PATCH-FIX-UPSTREAM dask-expr-pr1173-blockwise.patch gh#dask/dask-expr#1173
+Patch0:
https://github.com/dask/dask-expr/pull/1173.patch#/dask-expr-pr1173-blockwise.patch
BuildRequires: %{python_module base >= 3.10}
BuildRequires: %{python_module pip}
BuildRequires: %{python_module setuptools >= 62.6}
@@ -39,7 +41,7 @@
BuildRequires: %{python_module wheel}
BuildRequires: fdupes
BuildRequires: python-rpm-macros
-Requires: python-dask = 2024.11.2
+Requires: python-dask = 2024.12.0
Requires: python-pandas >= 2
Requires: python-pyarrow >= 14.0.1
Provides: python-dask_expr = %{version}-%{release}
++++++ dask-expr-pr1173-blockwise.patch ++++++
>From 7b6d178a31cdc52816908ba93aae3f6e3bbae680 Mon Sep 17 00:00:00 2001
From: James Bourbeau <[email protected]>
Date: Wed, 4 Dec 2024 10:41:15 -0600
Subject: [PATCH 1/2] Use new blockwise unpack collection in array
---
dask_expr/array/blockwise.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py
index 838539e0..23792434 100644
--- a/dask_expr/array/blockwise.py
+++ b/dask_expr/array/blockwise.py
@@ -18,7 +18,7 @@
from dask.array.utils import compute_meta
from dask.base import is_dask_collection
from dask.blockwise import blockwise as core_blockwise
-from dask.delayed import unpack_collections
+from dask.blockwise import _blockwise_unpack_collections_task_spec
from dask.tokenize import tokenize
from dask.utils import cached_property, funcname
@@ -142,7 +142,7 @@ def _layer(self):
for arg, ind in arginds:
if ind is None:
arg = normalize_arg(arg)
- arg, collections = unpack_collections(arg)
+ arg, collections = _blockwise_unpack_collections_task_spec(arg)
dependencies.extend(collections)
else:
if (
@@ -163,7 +163,7 @@ def _layer(self):
kwargs2 = {}
for k, v in self.kwargs.items():
v = normalize_arg(v)
- v, collections = unpack_collections(v)
+ v, collections = _blockwise_unpack_collections_task_spec(v)
dependencies.extend(collections)
kwargs2[k] = v
>From fd6f081bcce4f36190b87ce26ae278cc3de71d04 Mon Sep 17 00:00:00 2001
From: James Bourbeau <[email protected]>
Date: Wed, 4 Dec 2024 10:46:47 -0600
Subject: [PATCH 2/2] Lint
---
dask_expr/array/blockwise.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py
index 23792434..4553f05c 100644
--- a/dask_expr/array/blockwise.py
+++ b/dask_expr/array/blockwise.py
@@ -17,8 +17,8 @@
)
from dask.array.utils import compute_meta
from dask.base import is_dask_collection
-from dask.blockwise import blockwise as core_blockwise
from dask.blockwise import _blockwise_unpack_collections_task_spec
+from dask.blockwise import blockwise as core_blockwise
from dask.tokenize import tokenize
from dask.utils import cached_property, funcname
++++++ dask_expr-1.1.19-gh.tar.gz -> dask_expr-1.1.20-gh.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/changes.md
new/dask-expr-1.1.20/changes.md
--- old/dask-expr-1.1.19/changes.md 2024-11-13 16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/changes.md 2024-12-03 23:40:36.000000000 +0100
@@ -1,5 +1,20 @@
## Dask-expr
+# v1.1.20
+
+- Fix value_counts with split_out != 1 (:pr:`1170`) `Patrick Hoefler`_
+- Remove recursion in task spec (:pr:`1158`) `Florian Jetter`_
+- Deprecated and remove from_legacy_dataframe usage (:pr:`1168`) `Patrick
Hoefler`_
+- Remove ``from_dask_dataframe`` (:pr:`1167`) `Patrick Hoefler`_
+- Avoid exponentially growing graph for Assign-Projection combinations
(:pr:`1164`) `Patrick Hoefler`_
+- Introduce more caching when walking the expression (:pr:`1165`) `Patrick
Hoefler`_
+- Use Taskspec fuse implementation (:pr:`1162`) `Florian Jetter`_
+- Fix orphaned dependencies in Fused expression (:pr:`1163`) `Patrick Hoefler`_
+
+# v1.1.19
+
+# v1.1.18
+
# v1.1.17
- Add support for Python 3.13 (:pr:`1160`) `James Bourbeau`_
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_collection.py
new/dask-expr-1.1.20/dask_expr/_collection.py
--- old/dask-expr-1.1.19/dask_expr/_collection.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/_collection.py 2024-12-03
23:40:36.000000000 +0100
@@ -45,7 +45,7 @@
meta_series_constructor,
pyarrow_strings_enabled,
)
-from dask.delayed import delayed
+from dask.delayed import Delayed, delayed
from dask.utils import (
IndexCallable,
M,
@@ -66,6 +66,7 @@
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype,
is_numeric_dtype
from pandas.api.types import is_scalar as pd_is_scalar
from pandas.api.types import is_timedelta64_dtype
+from pandas.core.dtypes.common import is_extension_array_dtype
from pyarrow import fs as pa_fs
from tlz import first
@@ -159,9 +160,7 @@
if isinstance(other, FrameBase):
other = other.expr
elif isinstance(other, da.Array):
- other = from_dask_array(
- other, index=self.index.to_legacy_dataframe(), columns=self.columns
- )
+ other = from_dask_array(other, index=self.index, columns=self.columns)
if self.ndim == 1 and len(self.columns):
other = other[self.columns[0]]
@@ -1371,11 +1370,9 @@
Repartition(self, npartitions, divisions, force,
partition_size, freq)
)
- def to_dask_dataframe(self, *args, **kwargs) -> _Frame:
+ def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) ->
_Frame:
"""Convert to a legacy dask-dataframe collection
- WARNING: This API is deprecated. Please use `to_legacy_dataframe`.
-
Parameters
----------
optimize
@@ -1384,21 +1381,11 @@
Key-word arguments to pass through to `optimize`.
"""
warnings.warn(
- "`to_dask_dataframe` is deprecated, please use
`to_legacy_dataframe`.",
+ "to_legacy_dataframe is deprecated and will be removed in a future
release. "
+ "The legacy implementation as a whole is deprecated and will be
removed, making "
+ "this method unnecessary.",
FutureWarning,
)
- return self.to_legacy_dataframe(*args, **kwargs)
-
- def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) ->
_Frame:
- """Convert to a legacy dask-dataframe collection
-
- Parameters
- ----------
- optimize
- Whether to optimize the underlying `Expr` object before conversion.
- **optimize_kwargs
- Key-word arguments to pass through to `optimize`.
- """
df = self.optimize(**optimize_kwargs) if optimize else self
return new_dd_object(df.dask, df._name, df._meta, df.divisions)
@@ -1430,9 +1417,18 @@
-------
A Dask Array
"""
- return self.to_legacy_dataframe(optimize,
**optimize_kwargs).to_dask_array(
- lengths=lengths, meta=meta
- )
+ if lengths is True:
+ lengths = tuple(self.map_partitions(len,
enforce_metadata=False).compute())
+
+ arr = self.values
+
+ chunks = self._validate_chunks(arr, lengths)
+ arr._chunks = chunks
+
+ if meta is not None:
+ arr._meta = meta
+
+ return arr
@property
def values(self):
@@ -1442,7 +1438,13 @@
Operations that depend on shape information, like slicing or reshaping,
will not work.
"""
- return self.to_dask_array()
+ if is_extension_array_dtype(self._meta.values):
+ warnings.warn(
+ "Dask currently has limited support for converting pandas
extension dtypes "
+ f"to arrays. Converting {self._meta.values.dtype} to object
dtype.",
+ UserWarning,
+ )
+ return self.map_partitions(methods.values)
def __divmod__(self, other):
result = self.expr.__divmod__(other)
@@ -2460,15 +2462,38 @@
if lengths is True:
lengths = tuple(self.map_partitions(len).compute())
+ records = to_records(self)
- frame = self.to_legacy_dataframe()
- records = to_records(frame)
-
- chunks = frame._validate_chunks(records, lengths)
+ chunks = self._validate_chunks(records, lengths)
records._chunks = (chunks[0],)
return records
+ def _validate_chunks(self, arr, lengths):
+ from collections.abc import Sequence
+
+ from dask.array.core import normalize_chunks
+
+ if isinstance(lengths, Sequence):
+ lengths = tuple(lengths)
+
+ if len(lengths) != self.npartitions:
+ raise ValueError(
+ "The number of items in 'lengths' does not match the
number of "
+ f"partitions. {len(lengths)} != {self.npartitions}"
+ )
+
+ if self.ndim == 1:
+ chunks = normalize_chunks((lengths,))
+ else:
+ chunks = normalize_chunks((lengths, (len(self.columns),)))
+
+ return chunks
+ elif lengths is not None:
+ raise ValueError(f"Unexpected value for 'lengths': '{lengths}'")
+
+ return arr._chunks
+
def to_bag(self, index=False, format="tuple"):
"""Create a Dask Bag from a Series"""
from dask_expr.io.bag import to_bag
@@ -2498,7 +2523,13 @@
--------
dask_expr.from_delayed
"""
- return
self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph)
+ if optimize_graph:
+ frame = self.optimize()
+ else:
+ frame = self
+ keys = frame.__dask_keys__()
+ graph = frame.__dask_graph__()
+ return [Delayed(k, graph) for k in keys]
def to_backend(self, backend: str | None = None, **kwargs):
"""Move to a new DataFrame backend
@@ -2812,9 +2843,7 @@
"Number of partitions do not match "
f"({v.npartitions} != {result.npartitions})"
)
- v = from_dask_array(
- v, index=result.index.to_legacy_dataframe(),
meta=result._meta
- )
+ v = from_dask_array(v, index=result.index, meta=result._meta)
else:
raise TypeError(f"Column assignment doesn't support type
{type(v)}")
args.extend([k, v])
@@ -4797,6 +4826,9 @@
def dtype(self):
return pd.Series(self._meta).dtype
+ def to_delayed(self, optimize_graph=True):
+ return super().to_delayed(optimize_graph=optimize_graph)[0]
+
def new_collection(expr):
"""Create new collection from an expr"""
@@ -5020,31 +5052,20 @@
)
-def from_dask_dataframe(*args, **kwargs) -> FrameBase:
+def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
"""Create a dask-expr collection from a legacy dask-dataframe collection
- WARNING: This API is deprecated. Please use `from_legacy_dataframe`.
-
Parameters
----------
optimize
Whether to optimize the graph before conversion.
"""
warnings.warn(
- "`from_dask_dataframe` is deprecated, please use
`from_legacy_dataframe`.",
+ "from_legacy_dataframe is deprecated and will be removed in a future
release. "
+ "The legacy implementation as a whole is deprecated and will be
removed, making "
+ "this method unnecessary.",
FutureWarning,
)
- return from_legacy_dataframe(*args, **kwargs)
-
-
-def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
- """Create a dask-expr collection from a legacy dask-dataframe collection
-
- Parameters
- ----------
- optimize
- Whether to optimize the graph before conversion.
- """
graph = ddf.dask
if optimize:
graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__())
@@ -5100,12 +5121,9 @@
"""
from dask.dataframe.io import from_dask_array
- if isinstance(index, FrameBase):
- index = index.to_legacy_dataframe()
if columns is not None and isinstance(columns, list) and not len(columns):
columns = None
- df = from_dask_array(x, columns=columns, index=index, meta=meta)
- return from_legacy_dataframe(df, optimize=True)
+ return from_dask_array(x, columns=columns, index=index, meta=meta)
@dataframe_creation_dispatch.register_inplace("pandas")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_core.py
new/dask-expr-1.1.20/dask_expr/_core.py
--- old/dask-expr-1.1.19/dask_expr/_core.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_core.py 2024-12-03 23:40:36.000000000
+0100
@@ -160,17 +160,26 @@
raise RuntimeError(f"Serializing a {type(self)} object")
return type(self), tuple(self.operands)
- def _depth(self):
+ def _depth(self, cache=None):
"""Depth of the expression tree
Returns
-------
depth: int
"""
+ if cache is None:
+ cache = {}
if not self.dependencies():
return 1
else:
- return max(expr._depth() for expr in self.dependencies()) + 1
+ result = []
+ for expr in self.dependencies():
+ if expr._name in cache:
+ result.append(cache[expr._name])
+ else:
+ result.append(expr._depth(cache) + 1)
+ cache[expr._name] = result[-1]
+ return max(result)
def operand(self, key):
# Access an operand unambiguously
@@ -242,7 +251,7 @@
for i in range(self.npartitions)
}
- def rewrite(self, kind: str):
+ def rewrite(self, kind: str, rewritten):
"""Rewrite an expression
This leverages the ``._{kind}_down`` and ``._{kind}_up``
@@ -255,6 +264,9 @@
changed:
whether or not any change occured
"""
+ if self._name in rewritten:
+ return rewritten[self._name]
+
expr = self
down_name = f"_{kind}_down"
up_name = f"_{kind}_up"
@@ -291,7 +303,8 @@
changed = False
for operand in expr.operands:
if isinstance(operand, Expr):
- new = operand.rewrite(kind=kind)
+ new = operand.rewrite(kind=kind, rewritten=rewritten)
+ rewritten[operand._name] = new
if new._name != operand._name:
changed = True
else:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_expr.py
new/dask-expr-1.1.20/dask_expr/_expr.py
--- old/dask-expr-1.1.19/dask_expr/_expr.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_expr.py 2024-12-03 23:40:36.000000000
+0100
@@ -1899,6 +1899,16 @@
# don't squash if we are using a column that was previously
created
return
return Assign(*self.frame.operands, *self.operands[1:])
+ elif isinstance(self.frame, Projection) and isinstance(
+ self.frame.frame, Assign
+ ):
+ if self._check_for_previously_created_column(self.frame.frame):
+ return
+ new_columns = self.frame.operands[1].copy()
+ new_columns.extend(self.keys)
+ return Projection(
+ Assign(*self.frame.frame.operands, *self.operands[1:]),
new_columns
+ )
def _check_for_previously_created_column(self, child):
input_columns = []
@@ -3053,7 +3063,7 @@
return expr
# Manipulate Expression to make it more efficient
- expr = expr.rewrite(kind="tune")
+ expr = expr.rewrite(kind="tune", rewritten={})
if stage == "tuned-logical":
return expr
@@ -3193,15 +3203,14 @@
dependents[next._name] = set()
expr_mapping[next._name] = next
- for operand in next.operands:
- if isinstance(operand, Expr):
- stack.append(operand)
- if is_valid_blockwise_op(operand):
- if next._name in dependencies:
- dependencies[next._name].add(operand._name)
- dependents[operand._name].add(next._name)
- expr_mapping[operand._name] = operand
- expr_mapping[next._name] = next
+ for operand in next.dependencies():
+ stack.append(operand)
+ if is_valid_blockwise_op(operand):
+ if next._name in dependencies:
+ dependencies[next._name].add(operand._name)
+ dependents[operand._name].add(next._name)
+ expr_mapping[operand._name] = operand
+ expr_mapping[next._name] = next
# Traverse each "root" until we find a fusable sub-group.
# Here we use root to refer to a Blockwise Expr node that
@@ -3767,31 +3776,16 @@
def _task(self, name: Key, index: int) -> Task:
internal_tasks = []
- seen_keys = set()
- external_deps = set()
for _expr in self.exprs:
if self._broadcast_dep(_expr):
subname = (_expr._name, 0)
else:
subname = (_expr._name, index)
t = _expr._task(subname, subname[1])
+
assert t.key == subname
internal_tasks.append(t)
- seen_keys.add(subname)
- external_deps.update(t.dependencies)
- external_deps -= seen_keys
- dependencies = {dep: TaskRef(dep) for dep in external_deps}
- t = Task(
- name,
- Fused._execute_internal_graph,
- # Wrap the actual subgraph as a data node such that the tasks are
- # not erroneously parsed. The external task would otherwise carry
- # the internal keys as dependencies which is not satisfiable
- DataNode(None, internal_tasks),
- dependencies,
- (self.exprs[0]._name, index),
- )
- return t
+ return Task.fuse(*internal_tasks, key=name)
@staticmethod
def _execute_internal_graph(internal_tasks, dependencies, outkey):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_indexing.py
new/dask-expr-1.1.20/dask_expr/_indexing.py
--- old/dask-expr-1.1.19/dask_expr/_indexing.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_indexing.py 2024-12-03 23:40:36.000000000
+0100
@@ -18,7 +18,8 @@
from pandas.api.types import is_bool_dtype
from pandas.errors import IndexingError
-from dask_expr._collection import Series, from_legacy_dataframe, new_collection
+from dask_expr import from_dask_array
+from dask_expr._collection import Series, new_collection
from dask_expr._expr import (
Blockwise,
MaybeAlignPartitions,
@@ -98,6 +99,8 @@
elif is_series_like(iindexer) and not
is_bool_dtype(iindexer.dtype):
return new_collection(LocList(self.obj, iindexer.values,
cindexer))
elif isinstance(iindexer, list) or is_arraylike(iindexer):
+ if len(iindexer) == 0:
+ return new_collection(LocEmpty(self.obj._meta, cindexer))
return new_collection(LocList(self.obj, iindexer, cindexer))
else:
# element should raise KeyError
@@ -132,9 +135,7 @@
return new_collection(Loc(frame, iindexer))
def _loc_array(self, iindexer, cindexer):
- iindexer_series = from_legacy_dataframe(
- iindexer.to_dask_dataframe("_",
self.obj.index.to_legacy_dataframe())
- )
+ iindexer_series = from_dask_array(iindexer, columns="_",
index=self.obj.index)
return self._loc_series(iindexer_series, cindexer,
check_alignment=False)
def _maybe_partial_time_string(self, iindexer, unit):
@@ -250,6 +251,26 @@
return self._layer_information[0]
+class LocEmpty(LocList):
+ _parameters = ["meta", "cindexer"]
+
+ def _lower(self):
+ return None
+
+ @functools.cached_property
+ def _meta(self):
+ if self.cindexer is None:
+ return self.operand("meta")
+ else:
+ return self.operand("meta").loc[:, self.cindexer]
+
+ @functools.cached_property
+ def _layer_information(self):
+ divisions = [None, None]
+ dsk = {(self._name, 0): DataNode((self._name, 0), self._meta)}
+ return dsk, divisions
+
+
class LocSlice(LocBase):
@functools.cached_property
def start(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_merge.py
new/dask-expr-1.1.20/dask_expr/_merge.py
--- old/dask-expr-1.1.19/dask_expr/_merge.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_merge.py 2024-12-03 23:40:36.000000000
+0100
@@ -680,7 +680,7 @@
_barrier_key_left,
p2p_barrier,
token_left,
- transfer_keys_left,
+ *transfer_keys_left,
spec=DataFrameShuffleSpec(
id=token_left,
npartitions=self.npartitions,
@@ -698,7 +698,7 @@
_barrier_key_right,
p2p_barrier,
token_right,
- transfer_keys_right,
+ *transfer_keys_right,
spec=DataFrameShuffleSpec(
id=token_right,
npartitions=self.npartitions,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_reductions.py
new/dask-expr-1.1.20/dask_expr/_reductions.py
--- old/dask-expr-1.1.19/dask_expr/_reductions.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/_reductions.py 2024-12-03
23:40:36.000000000 +0100
@@ -244,7 +244,8 @@
# Reset the index if we we used it for shuffling
if split_by_index:
- shuffled = SetIndexBlockwise(shuffled, split_by, True, None)
+ idx = list(self._meta.index.names) if split_by != ["index"] else
split_by
+ shuffled = SetIndexBlockwise(shuffled, idx, True, None)
# Convert back to Series if necessary
if self.shuffle_by_index is not False:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_shuffle.py
new/dask-expr-1.1.20/dask_expr/_shuffle.py
--- old/dask-expr-1.1.19/dask_expr/_shuffle.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_shuffle.py 2024-12-03 23:40:36.000000000
+0100
@@ -592,7 +592,7 @@
_barrier_key,
p2p_barrier,
token,
- transfer_keys,
+ *transfer_keys,
spec=DataFrameShuffleSpec(
id=shuffle_id,
npartitions=self.npartitions_out,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_version.py
new/dask-expr-1.1.20/dask_expr/_version.py
--- old/dask-expr-1.1.19/dask_expr/_version.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/_version.py 2024-12-03 23:40:36.000000000
+0100
@@ -26,9 +26,9 @@
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (tag: v1.1.19)"
- git_full = "735fc8904832c723680c5c9ef5f89e01e622a076"
- git_date = "2024-11-13 16:16:32 +0100"
+ git_refnames = " (tag: v1.1.20)"
+ git_full = "daef4a273acfb117ebcbbac610103372ebade7c6"
+ git_date = "2024-12-03 16:40:36 -0600"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/_delayed.py
new/dask-expr-1.1.20/dask_expr/io/_delayed.py
--- old/dask-expr-1.1.19/dask_expr/io/_delayed.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/_delayed.py 2024-12-03
23:40:36.000000000 +0100
@@ -4,13 +4,14 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING
+import pandas as pd
from dask._task_spec import Alias, Task, TaskRef
from dask.dataframe.dispatch import make_meta
-from dask.dataframe.utils import check_meta
+from dask.dataframe.utils import check_meta, pyarrow_strings_enabled
from dask.delayed import Delayed, delayed
from dask.typing import Key
-from dask_expr._expr import DelayedsExpr, PartitionsFiltered
+from dask_expr._expr import ArrowStringConversion, DelayedsExpr,
PartitionsFiltered
from dask_expr._util import _tokenize_deterministic
from dask_expr.io import BlockwiseIO
@@ -141,8 +142,12 @@
from dask_expr._collection import new_collection
- return new_collection(
- FromDelayed(
- DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None,
prefix
- )
+ result = FromDelayed(
+ DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None,
prefix
)
+ if pyarrow_strings_enabled() and any(
+ pd.api.types.is_object_dtype(dtype)
+ for dtype in (result.dtypes.values if result.ndim == 2 else
[result.dtypes])
+ ):
+ return new_collection(ArrowStringConversion(result))
+ return new_collection(result)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/bag.py
new/dask-expr-1.1.20/dask_expr/io/bag.py
--- old/dask-expr-1.1.19/dask_expr/io/bag.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/bag.py 2024-12-03 23:40:36.000000000
+0100
@@ -1,4 +1,42 @@
+from dask.dataframe.io.io import _df_to_bag
+from dask.tokenize import tokenize
+
+from dask_expr import FrameBase
+
+
def to_bag(df, index=False, format="tuple"):
- from dask.dataframe.io import to_bag as _to_bag
+ """Create Dask Bag from a Dask DataFrame
+
+ Parameters
+ ----------
+ index : bool, optional
+ If True, the elements are tuples of ``(index, value)``, otherwise
+ they're just the ``value``. Default is False.
+ format : {"tuple", "dict", "frame"}, optional
+ Whether to return a bag of tuples, dictionaries, or
+ dataframe-like objects. Default is "tuple". If "frame",
+ the original partitions of ``df`` will not be transformed
+ in any way.
+
+
+ Examples
+ --------
+ >>> bag = df.to_bag() # doctest: +SKIP
+ """
+ from dask.bag.core import Bag
+
+ df = df.optimize()
- return _to_bag(df.to_legacy_dataframe(), index=index, format=format)
+ if not isinstance(df, FrameBase):
+ raise TypeError("df must be either DataFrame or Series")
+ name = "to_bag-" + tokenize(df._name, index, format)
+ if format == "frame":
+ dsk = df.dask
+ name = df._name
+ else:
+ dsk = {
+ (name, i): (_df_to_bag, block, index, format)
+ for (i, block) in enumerate(df.__dask_keys__())
+ }
+ dsk.update(df.__dask_graph__())
+ return Bag(dsk, name, df.npartitions)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/csv.py
new/dask-expr-1.1.20/dask_expr/io/csv.py
--- old/dask-expr-1.1.19/dask_expr/io/csv.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/csv.py 2024-12-03 23:40:36.000000000
+0100
@@ -280,7 +280,7 @@
from dask.dataframe.io.csv import to_csv as _to_csv
return _to_csv(
- df.to_legacy_dataframe(),
+ df.optimize(),
filename,
single_file=single_file,
encoding=encoding,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/hdf.py
new/dask-expr-1.1.20/dask_expr/io/hdf.py
--- old/dask-expr-1.1.19/dask_expr/io/hdf.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/hdf.py 2024-12-03 23:40:36.000000000
+0100
@@ -1,6 +1,3 @@
-from dask_expr import from_legacy_dataframe
-
-
def read_hdf(
pattern,
key,
@@ -14,7 +11,7 @@
):
from dask.dataframe.io import read_hdf as _read_hdf
- df = _read_hdf(
+ return _read_hdf(
pattern,
key,
start=start,
@@ -25,7 +22,6 @@
lock=lock,
mode=mode,
)
- return from_legacy_dataframe(df)
def to_hdf(
@@ -130,7 +126,7 @@
from dask.dataframe.io import to_hdf as _to_hdf
return _to_hdf(
- df.to_legacy_dataframe(),
+ df.optimize(),
path,
key,
mode=mode,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/io.py
new/dask-expr-1.1.20/dask_expr/io/io.py
--- old/dask-expr-1.1.19/dask_expr/io/io.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/io.py 2024-12-03 23:40:36.000000000
+0100
@@ -6,7 +6,7 @@
import numpy as np
import pyarrow as pa
-from dask._task_spec import Task
+from dask._task_spec import List, Task
from dask.dataframe import methods
from dask.dataframe._pyarrow import to_pyarrow_string
from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta
@@ -135,7 +135,7 @@
bucket = self._fusion_buckets[index]
# FIXME: This will likely require a wrapper
return Task(
- name, methods.concat, [expr._filtered_task(name, i) for i in
bucket]
+ name, methods.concat, List(*(expr._filtered_task(name, i) for i in
bucket))
)
@functools.cached_property
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/json.py
new/dask-expr-1.1.20/dask_expr/io/json.py
--- old/dask-expr-1.1.19/dask_expr/io/json.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/json.py 2024-12-03 23:40:36.000000000
+0100
@@ -1,7 +1,6 @@
import pandas as pd
from dask.dataframe.utils import insert_meta_param_description
-from dask_expr import from_legacy_dataframe
from dask_expr._backends import dataframe_creation_dispatch
@@ -98,7 +97,7 @@
"""
from dask.dataframe.io.json import read_json
- df = read_json(
+ return read_json(
url_path,
orient=orient,
lines=lines,
@@ -114,7 +113,6 @@
path_converter=path_converter,
**kwargs,
)
- return from_legacy_dataframe(df)
def to_json(
@@ -172,7 +170,7 @@
from dask.dataframe.io.json import to_json
return to_json(
- df.to_legacy_dataframe(),
+ df,
url_path,
orient=orient,
lines=lines,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/orc.py
new/dask-expr-1.1.20/dask_expr/io/orc.py
--- old/dask-expr-1.1.19/dask_expr/io/orc.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/orc.py 2024-12-03 23:40:36.000000000
+0100
@@ -1,4 +1,3 @@
-from dask_expr import from_legacy_dataframe
from dask_expr._backends import dataframe_creation_dispatch
@@ -48,7 +47,7 @@
"""
from dask.dataframe.io import read_orc as _read_orc
- df = _read_orc(
+ return _read_orc(
path,
engine=engine,
columns=columns,
@@ -57,7 +56,6 @@
aggregate_files=aggregate_files,
storage_options=storage_options,
)
- return from_legacy_dataframe(df)
def to_orc(
@@ -72,7 +70,7 @@
from dask.dataframe.io import to_orc as _to_orc
return _to_orc(
- df.to_legacy_dataframe(),
+ df.optimize(),
path,
engine=engine,
write_index=write_index,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/parquet.py
new/dask-expr-1.1.20/dask_expr/io/parquet.py
--- old/dask-expr-1.1.19/dask_expr/io/parquet.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/parquet.py 2024-12-03
23:40:36.000000000 +0100
@@ -593,7 +593,7 @@
# Engine-specific initialization steps to write the dataset.
# Possibly create parquet metadata, and load existing stuff if appending
i_offset, fmd, metadata_file_exists, extra_write_kwargs =
engine.initialize_write(
- df.to_legacy_dataframe(),
+ df,
fs,
path,
append=append,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/sql.py
new/dask-expr-1.1.20/dask_expr/io/sql.py
--- old/dask-expr-1.1.19/dask_expr/io/sql.py 2024-11-13 16:16:32.000000000
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/sql.py 2024-12-03 23:40:36.000000000
+0100
@@ -1,6 +1,3 @@
-from dask_expr import from_legacy_dataframe
-
-
def read_sql(sql, con, index_col, **kwargs):
"""
Read SQL query or database table into a DataFrame.
@@ -36,8 +33,7 @@
"""
from dask.dataframe.io.sql import read_sql
- df = read_sql(sql, con, index_col, **kwargs)
- return from_legacy_dataframe(df)
+ return read_sql(sql, con, index_col, **kwargs)
def read_sql_table(
@@ -122,7 +118,7 @@
"""
from dask.dataframe.io.sql import read_sql_table as _read_sql_table
- df = _read_sql_table(
+ return _read_sql_table(
table_name,
con,
index_col,
@@ -137,7 +133,6 @@
engine_kwargs=engine_kwargs,
**kwargs,
)
- return from_legacy_dataframe(df)
def read_sql_query(
@@ -210,7 +205,7 @@
"""
from dask.dataframe.io.sql import read_sql_query as _read_sql_query
- df = _read_sql_query(
+ return _read_sql_query(
sql,
con,
index_col,
@@ -223,7 +218,6 @@
engine_kwargs=engine_kwargs,
**kwargs,
)
- return from_legacy_dataframe(df)
def to_sql(
@@ -354,7 +348,7 @@
from dask.dataframe.io.sql import to_sql as _to_sql
return _to_sql(
- df.to_legacy_dataframe(),
+ df,
name=name,
uri=uri,
schema=schema,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py
new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py
--- old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py 2024-12-03
23:40:36.000000000 +0100
@@ -63,4 +63,4 @@
df = read_parquet(tmpdir, filesystem=filesystem)
from distributed.protocol import dumps
- assert len(b"".join(dumps(df.optimize().dask))) <= 9000
+ assert len(b"".join(dumps(df.optimize().dask))) <= 9100
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py
new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py
--- old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py 2024-12-03
23:40:36.000000000 +0100
@@ -14,7 +14,6 @@
DataFrame,
from_array,
from_dask_array,
- from_dask_dataframe,
from_dict,
from_legacy_dataframe,
from_map,
@@ -231,29 +230,21 @@
@pytest.mark.parametrize("optimize", [True, False])
def test_from_legacy_dataframe(optimize):
ddf = dd.from_dict({"a": range(100)}, npartitions=10)
- df = from_legacy_dataframe(ddf, optimize=optimize)
+ with pytest.warns(FutureWarning, match="is deprecated"):
+ df = from_legacy_dataframe(ddf, optimize=optimize)
assert isinstance(df.expr, Expr)
assert_eq(df, ddf)
- # Check deprecated API
- with pytest.warns(FutureWarning, match="deprecated"):
- df2 = from_dask_dataframe(ddf, optimize=optimize)
- assert_eq(df, df2)
-
@pytest.mark.parametrize("optimize", [True, False])
def test_to_legacy_dataframe(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2)
- ddf = df.to_legacy_dataframe(optimize=optimize)
+ with pytest.warns(FutureWarning, match="is deprecated"):
+ ddf = df.to_legacy_dataframe(optimize=optimize)
assert isinstance(ddf, dd.core.DataFrame)
assert_eq(df, ddf)
- # Check deprecated API
- with pytest.warns(FutureWarning, match="deprecated"):
- ddf2 = df.to_dask_dataframe(optimize=optimize)
- assert_eq(ddf, ddf2)
-
@pytest.mark.parametrize("optimize", [True, False])
def test_to_dask_array(optimize):
@@ -470,3 +461,41 @@
obj = meta.schema
assert normalizer(obj) == normalizer(obj)
+
+
[email protected]("lengths", [[2, 2], True])
+def test_to_records_with_lengths(lengths):
+ pytest.importorskip("dask.array")
+ from dask.array.utils import assert_eq
+
+ df = pd.DataFrame(
+ {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
+ index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
+ )
+ ddf = dd.from_pandas(df, 2)
+
+ result = ddf.to_records(lengths=lengths)
+ assert_eq(df.to_records(), result, check_type=False) # TODO: make
check_type pass
+
+ assert isinstance(result, da.Array)
+
+ expected_chunks = ((2, 2),)
+
+ assert result.chunks == expected_chunks
+
+
+def test_to_bag():
+ a = pd.DataFrame(
+ {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
+ index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
+ )
+ ddf = dd.from_pandas(a, 2)
+
+ assert ddf.to_bag().compute() == list(a.itertuples(False))
+ assert ddf.to_bag(True).compute() == list(a.itertuples(True))
+ assert ddf.to_bag(format="dict").compute() == [
+ {"x": "a", "y": 2},
+ {"x": "b", "y": 3},
+ {"x": "c", "y": 4},
+ {"x": "d", "y": 5},
+ ]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_collection.py
new/dask-expr-1.1.20/dask_expr/tests/test_collection.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_collection.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_collection.py 2024-12-03
23:40:36.000000000 +0100
@@ -2651,6 +2651,44 @@
assert_eq(ddf.melt(**kwargs), pdf.melt(**kwargs), check_index=False)
+def test_assign_projection_mix():
+ data = {
+ "date": [
+ "2024-03-22 18:13:36.801000",
+ "2024-03-22 18:14:11.457000",
+ "2024-04-02 06:05:01.658000",
+ "2024-04-02 06:05:04.870000",
+ "2024-04-03 06:11:30.202000",
+ ],
+ "code": [1.0, 3.0, 6.0, 6.0, 8.0],
+ "first": pd.NaT,
+ "first_2": pd.NaT,
+ "second": pd.NaT,
+ "second_2": pd.NaT,
+ "third": pd.NaT,
+ }
+ df = pd.DataFrame(data)
+ df["date"] = pd.to_datetime(df["date"])
+
+ df = from_pandas(df)
+
+ def apply_func(x):
+ return x
+
+ event_columns = {1: ["first", "first_2"], 2: ["second", "second_2"], 3:
["third"]}
+
+ for event_code, columns in event_columns.items():
+ mask = df["code"] == event_code
+ df[columns[0]] = df[columns[0]].mask(cond=(mask), other=df["date"])
+ if len(columns) == 2:
+ df[columns[1]] = df[columns[1]].mask(cond=(mask), other=df["date"])
+ df[columns[1]] = df[columns[1]].apply(apply_func)
+
+ df = df.drop(columns=["code", "date"])
+ result = df.optimize(fuse=False)
+ assert result.expr._depth() == 13.0 # this grew exponentially previously
+
+
def test_dropna_merge(df, pdf):
dropped_na = df.dropna(subset=["x"])
result = dropped_na.merge(dropped_na, on="x")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py
new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py 2024-12-03
23:40:36.000000000 +0100
@@ -157,6 +157,13 @@
assert_eq(agg, expect)
+def test_value_counts_split_out(pdf):
+ df = from_pandas(pdf, npartitions=10)
+ result = df.groupby("x").y.value_counts(split_out=True)
+ expected = pdf.groupby("x").y.value_counts()
+ assert_eq(result, expected)
+
+
def test_unique(df, pdf):
result = df.groupby("x")["y"].unique()
expected = pdf.groupby("x")["y"].unique()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py
new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py 2024-12-03
23:40:36.000000000 +0100
@@ -35,15 +35,6 @@
df.iloc[(1, 2, 3)]
-def test_loc(df, pdf):
- assert_eq(df.loc[:, "x"], pdf.loc[:, "x"])
- assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]])
- assert_eq(df.loc[:, []], pdf.loc[:, []])
-
- assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"])
- assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]])
-
-
def test_loc_slice(pdf, df):
pdf.columns = [10, 20]
df.columns = [10, 20]
@@ -86,6 +77,12 @@
def test_loc(df, pdf):
+ assert_eq(df.loc[:, "x"], pdf.loc[:, "x"])
+ assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]])
+ assert_eq(df.loc[:, []], pdf.loc[:, []])
+
+ assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"])
+ assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]])
assert df.loc[3:8].divisions[0] == 3
assert df.loc[3:8].divisions[-1] == 8
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py
new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py 2024-11-13
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py 2024-12-03
23:40:36.000000000 +0100
@@ -146,10 +146,10 @@
@pytest.mark.parametrize(
- "split_every,expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)]
+ "split_every, expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)]
)
def test_dataframe_mode_split_every(pdf, df, split_every, expect_tasks):
- assert_eq(df.to_legacy_dataframe().mode(split_every=split_every),
pdf.mode())
+ assert_eq(df.mode(split_every=split_every), pdf.mode())
q = df.mode(split_every=split_every).optimize(fuse=False)
assert len(q.__dask_graph__()) == expect_tasks
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/dask-expr-1.1.19/pyproject.toml
new/dask-expr-1.1.20/pyproject.toml
--- old/dask-expr-1.1.19/pyproject.toml 2024-11-13 16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/pyproject.toml 2024-12-03 23:40:36.000000000 +0100
@@ -26,7 +26,7 @@
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
- "dask == 2024.11.2",
+ "dask == 2024.12.0",
"pyarrow>=14.0.1",
"pandas >= 2",
]