tvalentyn opened a new issue, #31855:
URL: https://github.com/apache/beam/issues/31855

   ### What happened?
   
   I was experimenting with Beam Dataframe API through Beam Notebooks + 
Interactive runner and wasn't able to use `fillna` on individual columns. Here 
is a repro on a dataframe with two columns:
   
   ```
   %%writefile numbers.csv
   col1,col2
   1,1
   NaN,1
   -1,1
   ```
   
   ```
   import apache_beam as beam
   import apache_beam.runners.interactive.interactive_beam as ib
   from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner
   pipeline = beam.Pipeline(InteractiveRunner())
   
   beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv')
   beam_df['col1'] = beam_df['col1'].fillna(0)
   ib.collect(beam_df)
   ```
   
   This fails with ValueError: 
"[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a 
pipeline to be specified as there are no deferred inputs.
   
   A rewritten version: 
   ```
   c1 = beam_df['col1']
   c1 = c1.fillna(0)
   ib.collect(c1)  
   ```
   also fails.
   
   The snippets pass without issues on Pandas or Dask.  
   
   <details>
   <summary>Full Stacktrace</summary>
   
   ---------------------------------------------------------------------------
   ValueError                                Traceback (most recent call last)
   Cell In[62], line 16
        14 beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('numbers.csv')
        15 beam_df['col1'] = beam_df['col1'].fillna(0)
   ---> 16 ib.collect(beam_df)  # fails with ValueError: 
"[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a 
pipeline to be specified as there are no deferred inputs.
        18 c1 = beam_df['col1']
        19 c1 = c1.fillna(0)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:277,
 in progress_indicated.<locals>.run_within_progress_indicator(*args, **kwargs)
       274 @functools.wraps(func)
       275 def run_within_progress_indicator(*args, **kwargs):
       276   with ProgressIndicator(f'Processing... {func.__name__}', 'Done.'):
   --> 277     return func(*args, **kwargs)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py:906,
 in collect(pcoll, n, duration, include_window_info)
       902 # Remember the element type so we can make an informed decision on 
how to
       903 # collect the result in elements_to_df.
       904 if isinstance(pcoll, DeferredBase):
       905   # Get the proxy so we can get the output shape of the DataFrame.
   --> 906   pcoll, element_type = deferred_df_to_pcollection(pcoll)
       907   watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
       908 else:
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:313,
 in deferred_df_to_pcollection(df)
       310 cache.replace_with_cached(df._expr)
       312 proxy = df._expr.proxy()
   --> 313 return to_pcollection(df, yield_elements='pandas', 
label=str(df._expr)), proxy
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/convert.py:261,
 in to_pcollection(label, always_return_tuple, yield_elements, include_indexes, 
pipeline, *dataframes)
       257 new_dataframes = [
       258     df for df in dataframes if df._expr._id not in 
TO_PCOLLECTION_CACHE
       259 ]
       260 if len(new_dataframes):
   --> 261   new_results = {p: extract_input(p)
       262                  for p in placeholders
       263                  } | label >> 
transforms._DataframeExpressionsTransform({
       264                      ix: df._expr
       265                      for (ix, df) in enumerate(new_dataframes)
       266                  })  # type: Dict[Any, pvalue.PCollection]
       268   TO_PCOLLECTION_CACHE.update(
       269       {new_dataframes[ix]._expr._id: pc
       270        for ix, pc in new_results.items()})
       272 raw_results = {
       273     ix: TO_PCOLLECTION_CACHE[df._expr._id]
       274     for ix,
       275     df in enumerate(dataframes)
       276 }
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:1110,
 in _NamedPTransform.__ror__(self, pvalueish, _unused)
      1109 def __ror__(self, pvalueish, _unused=None):
   -> 1110   return self.transform.__ror__(pvalueish, self.label)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:623,
 in PTransform.__ror__(self, left, label)
       621 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
       622 self.pipeline = p
   --> 623 result = p.apply(self, pvalueish, label)
       624 if deferred:
       625   return result
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:679,
 in Pipeline.apply(self, transform, pvalueish, label)
       677 old_label, transform.label = transform.label, label
       678 try:
   --> 679   return self.apply(transform, pvalueish)
       680 finally:
       681   transform.label = old_label
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:732,
 in Pipeline.apply(self, transform, pvalueish, label)
       729 if type_options.pipeline_type_check:
       730   transform.type_check_inputs(pvalueish)
   --> 732 pvalueish_result = self.runner.apply(transform, pvalueish, 
self._options)
       734 if type_options is not None and type_options.pipeline_type_check:
       735   transform.type_check_outputs(pvalueish_result)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_runner.py:131,
 in InteractiveRunner.apply(self, transform, pvalueish, options)
       129 def apply(self, transform, pvalueish, options):
       130   # TODO(qinyeli, BEAM-646): Remove runner interception of apply.
   --> 131   return self._underlying_runner.apply(transform, pvalueish, options)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:203,
 in PipelineRunner.apply(self, transform, input, options)
       197 def apply(self,
       198           transform,  # type: PTransform
       199           input,  # type: Optional[pvalue.PValue]
       200           options  # type: PipelineOptions
       201          ):
       202   # TODO(robertwb): Remove indirection once internal references are 
fixed.
   --> 203   return self.apply_PTransform(transform, input, options)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:207,
 in PipelineRunner.apply_PTransform(self, transform, input, options)
       205 def apply_PTransform(self, transform, input, options):
       206   # TODO(robertwb): Remove indirection once internal references are 
fixed.
   --> 207   return transform.expand(input)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:151,
 in _DataframeExpressionsTransform.expand(self, inputs)
       150 def expand(self, inputs):
   --> 151   return self._apply_deferred_ops(inputs, self._outputs)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470,
 in _DataframeExpressionsTransform._apply_deferred_ops(self, inputs, outputs)
       467     return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
       469 # Now we can compute and return the result.
   --> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470,
 in <dictcomp>(.0)
       467     return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
       469 # Now we can compute and return the result.
   --> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
 in _memoize.<locals>.wrapper(*args, **kwargs)
       560 key = args, tuple(sorted(kwargs.items()))
       561 if key not in cache:
   --> 562   cache[key] = f(*args, **kwargs)
       563 return cache[key]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432,
 in 
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.expr_to_pcoll(expr)
       430   return inputs[expr]
       431 else:
   --> 432   return stage_to_result(expr_to_stage(expr))[expr._id]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
 in _memoize.<locals>.wrapper(*args, **kwargs)
       560 key = args, tuple(sorted(kwargs.items()))
       561 if key not in cache:
   --> 562   cache[key] = f(*args, **kwargs)
       563 return cache[key]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
 in 
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.stage_to_result(stage)
       422 @_memoize
       423 def stage_to_result(stage):
   --> 424   return {expr._id: expr_to_pcoll(expr)
       425           for expr in stage.inputs} | ComputeStage(stage)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
 in <dictcomp>(.0)
       422 @_memoize
       423 def stage_to_result(stage):
   --> 424   return {expr._id: expr_to_pcoll(expr)
       425           for expr in stage.inputs} | ComputeStage(stage)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
 in _memoize.<locals>.wrapper(*args, **kwargs)
       560 key = args, tuple(sorted(kwargs.items()))
       561 if key not in cache:
   --> 562   cache[key] = f(*args, **kwargs)
       563 return cache[key]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432,
 in 
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.expr_to_pcoll(expr)
       430   return inputs[expr]
       431 else:
   --> 432   return stage_to_result(expr_to_stage(expr))[expr._id]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562,
 in _memoize.<locals>.wrapper(*args, **kwargs)
       560 key = args, tuple(sorted(kwargs.items()))
       561 if key not in cache:
   --> 562   cache[key] = f(*args, **kwargs)
       563 return cache[key]
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424,
 in 
_DataframeExpressionsTransform._apply_deferred_ops.<locals>.stage_to_result(stage)
       422 @_memoize
       423 def stage_to_result(stage):
   --> 424   return {expr._id: expr_to_pcoll(expr)
       425           for expr in stage.inputs} | ComputeStage(stage)
   
   File 
/jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:602,
 in PTransform.__ror__(self, left, label)
       600     p = self.pipeline
       601   else:
   --> 602     raise ValueError(
       603         '"%s" requires a pipeline to be specified '
       604         'as there are no deferred inputs.' % self.label)
       605 else:
       606   p = self.pipeline or pipelines[0]
   
   ValueError: 
"[ConstantExpression[constant_int_140226803777984]]:140226803785616" requires a 
pipeline to be specified as there are no deferred inputs.
   import apache_beam as beam
   
   
   </details>
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to