tvalentyn opened a new issue, #31855:
URL: https://github.com/apache/beam/issues/31855
### What happened?
I was experimenting with Beam Dataframe API through Beam Notebooks +
Interactive runner and wasn't able to use `fillna` on individual columns. Here
is a repro on a dataframe with two columns:
```
%%writefile numbers.csv
col1,col2
1,1
NaN,1
-1,1
```
```
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner
pipeline = beam.Pipeline(InteractiveRunner())
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv')
beam_df['col1'] = beam_df['col1'].fillna(0)
ib.collect(beam_df)
```
This fails with ValueError:
"[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a
pipeline to be specified as there are no deferred inputs.
A rewritten version:
```
c1 = beam_df['col1']
c1 = c1.fillna(0)
ib.collect(c1)
```
also fails.
The snippets pass without issues on Pandas or Dask.
<details>
<summary>Full Stacktrace</summary>
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[62], line 16
14 beam_df = pipeline | 'Read CSV' >>
beam.dataframe.io.read_csv('numbers.csv')
15 beam_df['col1'] = beam_df['col1'].fillna(0)
---> 16 ib.collect(beam_df) # fails with ValueError:
"[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a
pipeline to be specified as there are no deferred inputs.
18 c1 = beam_df['col1']
19 c1 = c1.fillna(0)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:277,
in progress_indicated.<locals>.run_within_progress_indicator(*args, **kwargs)
274 @functools.wraps(func)
275 def run_within_progress_indicator(*args, **kwargs):
276 with ProgressIndicator(f'Processing... {func.__name__}', 'Done.'):
--> 277 return func(*args, **kwargs)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py:906,
in collect(pcoll, n, duration, include_window_info)
902 # Remember the element type so we can make an informed decision on
how to
903 # collect the result in elements_to_df.
904 if isinstance(pcoll, DeferredBase):
905 # Get the proxy so we can get the output shape of the DataFrame.
--> 906 pcoll, element_type = deferred_df_to_pcollection(pcoll)
907 watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
908 else:
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:313,
in deferred_df_to_pcollection(df)
310 cache.replace_with_cached(df._expr)
312 proxy = df._expr.proxy()
--> 313 return to_pcollection(df, yield_elements='pandas',
label=str(df._expr)), proxy
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/convert.py:261,
in to_pcollection(label, always_return_tuple, yield_elements, include_indexes,
pipeline, *dataframes)
257 new_dataframes = [
258 df for df in dataframes if df._expr._id not in
TO_PCOLLECTION_CACHE
259 ]
260 if len(new_dataframes):
--> 261 new_results = {p: extract_input(p)
262 for p in placeholders
263 } | label >>
transforms._DataframeExpressionsTransform({
264 ix: df._expr
265 for (ix, df) in enumerate(new_dataframes)
266 }) # type: Dict[Any, pvalue.PCollection]
268 TO_PCOLLECTION_CACHE.update(
269 {new_dataframes[ix]._expr._id: pc
270 for ix, pc in new_results.items()})
272 raw_results = {
273 ix: TO_PCOLLECTION_CACHE[df._expr._id]
274 for ix,
275 df in enumerate(dataframes)
276 }
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:1110,
in _NamedPTransform.__ror__(self, pvalueish, _unused)
1109 def __ror__(self, pvalueish, _unused=None):
-> 1110 return self.transform.__ror__(pvalueish, self.label)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:623,
in PTransform.__ror__(self, left, label)
621 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
622 self.pipeline = p
--> 623 result = p.apply(self, pvalueish, label)
624 if deferred:
625 return result
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:679,
in Pipeline.apply(self, transform, pvalueish, label)
677 old_label, transform.label = transform.label, label
678 try:
--> 679 return self.apply(transform, pvalueish)
680 finally:
681 transform.label = old_label
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:732,
in Pipeline.apply(self, transform, pvalueish, label)
729 if type_options.pipeline_type_check:
730 transform.type_check_inputs(pvalueish)
--> 732 pvalueish_result = self.runner.apply(transform, pvalueish,
self._options)
734 if type_options is not None and type_options.pipeline_type_check:
735 transform.type_check_outputs(pvalueish_result)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_runner.py:131,
in InteractiveRunner.apply(self, transform, pvalueish, options)
129 def apply(self, transform, pvalueish, options):
130 # TODO(qinyeli, BEAM-646): Remove runner interception of apply.
--> 131 return self._underlying_runner.apply(transform, pvalueish, options)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:203,
in PipelineRunner.apply(self, transform, input, options)
197 def apply(self,
198 transform, # type: PTransform
199 input, # type: Optional[pvalue.PValue]
200 options # type: PipelineOptions
201 ):
202 # TODO(robertwb): Remove indirection once internal references are
fixed.
--> 203 return self.apply_PTransform(transform, input, options)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:207,
in PipelineRunner.apply_PTransform(self, transform, input, options)
205 def apply_PTransform(self, transform, input, options):
206 # TODO(robertwb): Remove indirection once internal references are
fixed.
--> 207 return transform.expand(input)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:151,
in _DataframeExpressionsTransform.expand(self, inputs)
150 def expand(self, inputs):
--> 151 return self._apply_deferred_ops(inputs, self._outputs)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470,
in _DataframeExpressionsTransform._apply_deferred_ops(self, inputs, outputs)
467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
469 # Now we can compute and return the result.
--> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470,
in <dictcomp>(.0)
467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
469 # Now we can compute and return the result.
--> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
in _memoize.<locals>.wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432,
in
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.expr_to_pcoll(expr)
430 return inputs[expr]
431 else:
--> 432 return stage_to_result(expr_to_stage(expr))[expr._id]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
in _memoize.<locals>.wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
in
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.stage_to_result(stage)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
in <dictcomp>(.0)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
in _memoize.<locals>.wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432,
in
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.expr_to_pcoll(expr)
430 return inputs[expr]
431 else:
--> 432 return stage_to_result(expr_to_stage(expr))[expr._id]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
in _memoize.<locals>.wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
in
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.stage_to_result(stage)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)
File
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:602,
in PTransform.__ror__(self, left, label)
600 p = self.pipeline
601 else:
--> 602 raise ValueError(
603 '"%s" requires a pipeline to be specified '
604 'as there are no deferred inputs.' % self.label)
605 else:
606 p = self.pipeline or pipelines[0]
ValueError:
"[ConstantExpression[constant_int_140226803777984]]:140226803785616" requires a
pipeline to be specified as there are no deferred inputs.
import apache_beam as beam
</details>
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]