TheNeuralBit commented on a change in pull request #14778:
URL: https://github.com/apache/beam/pull/14778#discussion_r636476270
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -539,8 +541,16 @@ def collect(pcoll, n='inf', duration='inf',
include_window_info=False):
# Run the pipeline and bring the PCollection into memory as a Dataframe.
in_memory_square = head(square, n=5)
"""
+ # Remember the element type so we can make an informed decision on how to
+ # collect the result in elements_to_df.
if isinstance(pcoll, DeferredBase):
- pcoll = to_pcollection(pcoll)
+ # Get the proxy so we can get the output shape of the DataFrame.
+ element_type = pcoll._expr.proxy()
Review comment:
It's interesting to use the proxy as the element type. Note I filed
BEAM-11064 for creating a pandas typehint. Once that exists we should use it
here, but this makes sense for now.
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -298,8 +298,12 @@ def _watch(self, pcolls):
watched_pcollections.add(val)
elif isinstance(val, DeferredBase):
watched_dataframes.add(val)
- # Convert them all in a single step for efficiency.
- for pcoll in to_pcollection(*watched_dataframes, always_return_tuple=True):
+
+ # Convert them one-by-one to generate a unique label for each. This allows
+ # caching at a more fine-grained granularity.
+ for df in watched_dataframes:
+ pcoll = to_pcollection(
+ df, yield_elements='pandas', label=str(id(df._expr._id)))
Review comment:
Ack, ok that makes sense. Could you file a jira and/or a drop a TODO for
this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]