pritamdodeja commented on issue #35867:
URL: https://github.com/apache/beam/issues/35867#issuecomment-3207983763
Here's how I can reproduce the bug:
Execute ```debug_prism.py``` with ```MyRecordSchema.py``` in the same
directory
I had suspected this had something to do with Metrics, but I'm not so sure
anymore about that.
```
# debug_prism.py
import csv
import datetime
import logging
import os
import random
import shutil
import tempfile
from typing import (
Any,
NamedTuple,
Optional,
get_args,
)
import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
from MyRecordSchema import MyRecord
# --- Configuration ---
OUTPUT_FILE = 'random_data.csv'
NUMBER_OF_ROWS = 5000 # PrismRunner fails at this number of rows, but
succeeds for lower number of rows (e.g. 2000)
START_DATE = datetime.datetime(2016, 1, 1, 0, 0, 0)
END_DATE = datetime.datetime(2016, 6, 1, 0, 0, 0)
print(f"Generating data for '{OUTPUT_FILE}' from {START_DATE} to {END_DATE}
(max {NUMBER_OF_ROWS} rows)...")
current_time = START_DATE
rows_written = 0
DATA_CONFIG = {
# Integers with a specific range
"rybq": {"type": "integer", "params": (1, 5)},
"qjmw": {"type": "integer", "params": (1, 5)},
"ksel": {"type": "integer", "params": (1, 5)},
"eqqq": {"type": "integer", "params": (2000, 2500)},
"xrus": {"type": "integer", "params": (30, 50)},
"biej": {"type": "integer", "params": (400, 500)},
"gftv": {"type": "integer", "params": (350, 450)},
"qcpd": {"type": "integer", "params": (400, 450)},
"hnox": {"type": "integer", "params": (60, 80)},
# Normally distributed floats
"fthm": {"type": "normal", "params": (65.0, 2.0)},
"xkqg": {"type": "normal", "params": (65.0, 2.0)},
"fuil": {"type": "normal", "params": (78.0, 1.0)},
"iusu": {"type": "normal", "params": (78.0, 1.0)},
"sczf": {"type": "integer", "params": (1, 5)},
"atlf": {"type": "integer", "params": (2300, 2500)},
"nedy": {"type": "integer", "params": (30, 40)},
# Uniformly distributed floats
"kxyq": {"type": "uniform", "params": (20.0, 30.0)},
"gmla": {"type": "uniform", "params": (60.0, 70.0)},
"ekld": {"type": "uniform", "params": (60.0, 70.0)},
"hwve": {"type": "uniform", "params": (80.0, 90.0)},
"tklu": {"type": "uniform", "params": (80.0, 90.0)},
"pzoa": {"type": "uniform", "params": (0.0, 5.0)},
"qmkg": {"type": "uniform", "params": (0.0, 5.0)},
"pazj": {"type": "uniform", "params": (0.0, 1.0)},
"qdym": {"type": "uniform", "params": (0.0, 1.0)},
"xyio": {"type": "uniform", "params": (50.0, 60.0)},
"vxtg": {"type": "uniform", "params": (25.0, 35.0)},
"nzpk": {"type": "uniform", "params": (25.0, 35.0)},
"ypvj": {"type": "uniform", "params": (20.0, 25.0)},
"mohz": {"type": "integer", "params": (1, 10)},
"vpfz": {"type": "integer", "params": (50, 60)},
"efoo": {"type": "uniform", "params": (25.0, 35.0)},
"aggn": {"type": "uniform", "params": (25.0, 35.0)},
"wlwb": {"type": "integer", "params": (20, 30)},
"bjqq": {"type": "uniform", "params": (70.0, 80.0)},
"wzbq": {"type": "uniform", "params": (40.0, 50.0)},
"juks": {"type": "uniform", "params": (30.0, 40.0)},
"dvdl": {"type": "uniform", "params": (20.0, 30.0)},
# Categorical data
"avit": {"type": "categorical", "params": [0, 1]},
"nrha": {"type": "categorical", "params": [0, 1]},
"lejm": {"type": "categorical", "params": [0, 1]},
"kwyk": {"type": "categorical", "params": [0, 1]},
"prqy": {"type": "categorical", "params": [0, 1]},
"putr": {"type": "categorical", "params": [0, 1]},
"cpwn": {"type": "categorical", "params": [0, 1]},
"dwlo": {"type": "categorical", "params": [0, 1]},
"vhol": {"type": "categorical", "params": [1, 2, 3]},
"wrmw": {"type": "categorical", "params": [1, 2, 3]},
"gnmk": {"type": "categorical", "params": [1, 2, 3]},
"neex": {"type": "categorical", "params": [1, 2, 3]},
"ylgu": {"type": "categorical", "params": [1, 2, 3, 4]},
"kvyd": {"type": "categorical", "params": [1, 2, 3, 4]},
"ywge": {"type": "categorical", "params": [1, 2, 3, 4]},
"iuvv": {"type": "categorical", "params": [1, 2, 3, 4]},
"kaij": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
"nbgs": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
"llxf": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
"mwie": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
}
MISSING_VALUE_CONFIG = {
"probability": 0.15, # 15% chance for a value to be missing
"features": [
"cpwn", "dwlo", "ekld", "hnox", "iusu", "kaij", "llxf", "mwie",
"nbgs", "prqy", "putr", "qcpd", "tklu", "xkqg"
]
}
HEADER = ["timestamp"] + list(DATA_CONFIG.keys())
def generate_random_row(current_timestamp):
"""Generates a single row of random data based on the DATA_CONFIG."""
row = {"timestamp": current_timestamp.strftime('%Y-%m-%d %H:%M:%S')}
for feature, config in DATA_CONFIG.items():
# Check if this feature should have a missing value
if feature in MISSING_VALUE_CONFIG["features"] and random.random() <
MISSING_VALUE_CONFIG["probability"]:
row[feature] = ''
continue
# Generate data based on the specified type
if config["type"] == "normal":
mean, std_dev = config["params"]
row[feature] = random.normalvariate(mean, std_dev)
elif config["type"] == "uniform":
min_val, max_val = config["params"]
row[feature] = random.uniform(min_val, max_val)
elif config["type"] == "integer":
min_val, max_val = config["params"]
row[feature] = random.randint(min_val, max_val)
elif config["type"] == "categorical":
row[feature] = random.choice(config["params"])
else:
row[feature] = ''
return [row[h] for h in HEADER]
with open(OUTPUT_FILE, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Write the header row
writer.writerow(HEADER)
# Write the data rows until we reach the end date or max rows
while current_time <= END_DATE and rows_written < NUMBER_OF_ROWS:
# Increment the timestamp by a random amount (e.g., 1 to 300 seconds)
current_time += datetime.timedelta(seconds=random.randint(1, 300))
row_data = generate_random_row(current_time)
writer.writerow(row_data)
rows_written += 1
print(f"Data generation complete. Wrote {rows_written} rows.")
# Data Processing Section
OUTPUT_FREQUENCY_SECONDS = 60
TARGET_SEQUENCE_STEPS = 20
WINDOW_SIZE_SECONDS = TARGET_SEQUENCE_STEPS * OUTPUT_FREQUENCY_SECONDS
INPUT_FILE_NO_HEADER = "./random_data.csv"
beam.coders.registry.register_coder(MyRecord, beam.coders.RowCoder)
class PartitionDoFn(beam.DoFn):
def __init__(self, fixed_threshold=None):
self._fixed_threshold = fixed_threshold
def setup(self):
import logging
self.logger = logging
def process(self, element_tuple, threshold_from_side_input=None):
current_key = element_tuple[0]
actual_threshold = self._fixed_threshold
if actual_threshold is None:
if threshold_from_side_input is None:
self.logger.error("Class {__class__.__name__} PartitionDoFn:
No fixed threshold set and no threshold provided from side input. Halting.")
raise ValueError("PartitionDoFn requires a threshold.")
actual_threshold = threshold_from_side_input
if current_key <= actual_threshold:
yield beam.pvalue.TaggedOutput('train', element_tuple)
else:
yield beam.pvalue.TaggedOutput('eval', element_tuple)
class AddTimestamps(beam.PTransform):
"""Assigns event timestamps to records based on their 'timestamp'
field."""
def __init__(self, obj):
self.obj = obj
class _AddTimestampsDoFn(beam.DoFn): # Renamed inner class
def setup(self):
import logging
self.logger = logging
def process(self, element):
"""Yields the element wrapped in TimestampedValue."""
try:
ts = element.timestamp
if not isinstance(ts, datetime.datetime):
self.logger.error(f"Class {__class__.__name__}:
AddTimestamps: Expected datetime, got {type(ts)} for {element}")
beam_timestamp = ts.timestamp()
yield beam.window.TimestampedValue(element, beam_timestamp)
except Exception as e:
self.logger.error(f"Class {__class__.__name__}: Error in
AddTimestamps for UUID {element}, timestamp '{element.timestamp}': {e}")
def expand(self, input_pcollection):
return input_pcollection | "AddTS_DoFn" >> beam.ParDo(
self._AddTimestampsDoFn())
class ResampleWindowedDataAndAssignStartKey(beam.DoFn):
def __init__(self, sampling_frequency=30, target_steps=20, obj=None):
self._sampling_frequency = sampling_frequency
self._target_steps = target_steps
self.processed_windows_counter =
Metrics.counter(self.__class__.__name__, 'processed_windows')
self.empty_windows_counter =
Metrics.counter(self.__class__.__name__, 'empty_or_skipped_windows')
self.records_per_window_dist =
Metrics.distribution(self.__class__.__name__, 'records_per_window')
self.obj = obj
def setup(self):
import logging
self.logger = logging
import pandas as pd
self._pd = pd
from datetime import datetime, timedelta
self._datetime = datetime
self._timedelta = timedelta
import numpy as np
self._np = np
def resample_asof(
self,
myrecord_df,
start_time,
sampling_frequency,
num_target_steps,
):
"""
Resamples a DataFrame to a new frequency starting from a specified
time
for a specific number of steps, using 'as of' logic.
"""
expected_cols = ['timestamp']
if myrecord_df is not None and hasattr(myrecord_df, 'columns'):
expected_cols.extend([col for col in myrecord_df.columns if col
!= 'timestamp'])
if myrecord_df is None or myrecord_df.empty:
self.logger.debug(f"{self.__class__.__name__}.resample_asof
received empty or None DataFrame.")
return self._pd.DataFrame(columns=expected_cols)
df = myrecord_df.copy()
if 'timestamp' not in df.columns:
if df.index.name == 'timestamp' and isinstance(df.index,
self._pd.DatetimeIndex):
df = df.reset_index()
else:
self.logger.error("Class {__class__.__name__} 'timestamp'
column not found in DataFrame for resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
if df.empty:
self.logger.debug("Class {__class__.__name__} DataFrame became
empty after handling timestamp column for resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
if not self._pd.api.types.is_datetime64_any_dtype(df['timestamp']):
try:
df['timestamp'] = self._pd.to_datetime(df['timestamp'])
except Exception as e:
self.logger.error(f"Class {__class__.__name__}: Could not
convert 'timestamp' column to datetime in resample_asof: {e}")
return self._pd.DataFrame(columns=expected_cols)
df = df.sort_values('timestamp').reset_index(drop=True)
if df.empty:
self.logger.debug("DataFrame empty after sorting in
resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
if not isinstance(start_time, self._datetime):
self.logger.error(f"Class {__class__.__name__}: resample_asof
expects start_time to be datetime.datetime, got {type(start_time)}")
return self._pd.DataFrame(columns=expected_cols)
actual_sampling_frequency = sampling_frequency
if not (actual_sampling_frequency and
hasattr(actual_sampling_frequency, 'total_seconds') and
actual_sampling_frequency.total_seconds() > 0):
self.logger.error(f"Class {__class__.__name__}: Invalid
actual_sampling_frequency: {actual_sampling_frequency} in resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
try:
df = df.set_index('timestamp')
except Exception as e:
self.logger.error(f"Class {__class__.__name__}: Error setting
'timestamp' as index in resample_asof: {e}")
return self._pd.DataFrame(columns=expected_cols)
df_filtered = df[df.index <= start_time +
self._timedelta(seconds=self._target_steps * self._sampling_frequency)]
if df_filtered.empty:
self.logger.debug(f"Class {__class__.__name__}: DataFrame empty
after filtering by start_time '{start_time}' in resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
resampled_index = None
if num_target_steps > 0:
try:
resampled_index = self._pd.date_range(
start=start_time,
periods=num_target_steps,
freq=actual_sampling_frequency
)
except ValueError as e:
self.logger.error(f"Class {__class__.__name__}: Error
creating date_range with start={start_time}, periods={num_target_steps},
freq={actual_sampling_frequency}: {e}")
return self._pd.DataFrame(columns=expected_cols)
else:
self.logger.warning(f"Class {__class__.__name__}:
num_target_steps is not positive ({num_target_steps}), cannot generate
resampled_index.")
return self._pd.DataFrame(columns=expected_cols)
if resampled_index is None or resampled_index.empty:
self.logger.warning("Resampled index is None or empty in
resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
resampled_df = df_filtered.reindex(resampled_index, method='ffill')
if resampled_df.empty:
self.logger.debug("DataFrame empty after reindex in
resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
resampled_df = resampled_df.reset_index()
resampled_df = resampled_df.rename(columns={'index': 'timestamp'})
value_columns = [col for col in resampled_df.columns if col !=
'timestamp']
if value_columns:
resampled_df = resampled_df.dropna(subset=value_columns,
how='all')
if resampled_df.empty:
self.logger.debug("Class {__class__.__name__} Resampled
DataFrame became empty after dropna in resample_asof.")
return self._pd.DataFrame(columns=expected_cols)
final_ordered_columns = [col for col in expected_cols if col in
resampled_df.columns]
for col in resampled_df.columns:
if col not in final_ordered_columns:
final_ordered_columns.append(col)
return resampled_df[final_ordered_columns]
def process(
self,
element: tuple[int, list[Any]],
window=beam.DoFn.WindowParam
):
window_start_key, records_list = element
num_records = len(records_list)
self.processed_windows_counter.inc()
self.records_per_window_dist.update(num_records)
if num_records > 1000: # Adjust this threshold as needed
self.logger.warning(
f"Processing a LARGE window (key: {window_start_key}) with
{num_records} records. "
f"This may indicate data skew and is a likely bottleneck."
)
if not records_list:
self.logger.debug(f"{self.__class__.__name__}.process received
empty list.")
self.empty_windows_counter.inc()
yield None, None
if not element[1]:
self.logger.debug(f"{self.__class__.__name__}.process received
empty list.")
yield None, None
# Timestamps in 'element' should be datetime.datetime objects
sorted_original_list = sorted(element[1], key=lambda x: x.timestamp)
if not sorted_original_list:
# This case should ideally not be reached if 'element' is not
empty
self.logger.debug(f"{self.__class__.__name__}:
sorted_original_list is empty.")
yield None, None
dlist = [d._asdict() for d in sorted_original_list]
myrecord_df_original = self._pd.DataFrame(dlist)
if myrecord_df_original.empty or 'timestamp' not in
myrecord_df_original.columns:
self.logger.debug(f"{self.__class__.__name__}: DataFrame from
original list is empty or missing 'timestamp'.")
yield None, None
start_time_for_resampling = myrecord_df_original.iloc[0]['timestamp']
if isinstance(start_time_for_resampling, self._pd.Timestamp):
start_time_for_resampling =
start_time_for_resampling.to_pydatetime()
if self._pd.isna(start_time_for_resampling):
self.logger.warning(f"{self.__class__.__name__}:
start_time_for_resampling is NaT.")
yield None, None
sampling_timedelta =
self._timedelta(seconds=self._sampling_frequency)
resampled_df = self.resample_asof(
myrecord_df_original,
start_time=start_time_for_resampling,
sampling_frequency=sampling_timedelta,
num_target_steps=self._target_steps
)
if resampled_df.empty or 'timestamp' not in resampled_df.columns:
self.logger.debug(f"{self.__class__.__name__}: Resampling for
window starting near {start_time_for_resampling} yielded empty/invalid
DataFrame.")
yield None, None
# Calculate 'internal_sequence' for the resampled data.
# Assumes resampled_df['timestamp'] are UTC-aware pandas Timestamps.
resampled_df['internal_sequence'] =
(resampled_df['timestamp'].astype(self._np.int64) // 10**9)
# Removed: + 18000
myrecord_list_resampled = []
for i in range(len(resampled_df)):
myrecord_dict = resampled_df.iloc[i].to_dict()
if 'timestamp' in myrecord_dict and
isinstance(myrecord_dict['timestamp'], self._pd.Timestamp):
myrecord_dict['timestamp'] =
myrecord_dict['timestamp'].to_pydatetime()
try:
del myrecord_dict['internal_sequence']
myrecord_object = self.obj(**myrecord_dict)
myrecord_list_resampled.append(myrecord_object)
except Exception as e:
self.logger.error(f"Class {__class__.__name__}: Failed to
create MyRecord in {self.__class__.__name__} from dict {myrecord_dict}: {e}")
continue
if not myrecord_list_resampled:
self.logger.debug(f"{self.__class__.__name__}: Resampled list is
empty after object conversion, not yielding.")
yield None, None
key_for_partitioning =
int(myrecord_list_resampled[-1].timestamp.timestamp())
yield key_for_partitioning, myrecord_list_resampled
class ConvertToMyRecordBasic(beam.DoFn):
"""Parses a CSV string line into a MyRecord using
convert_to_datapoint."""
def __init__(self, obj):
self.obj = obj
def setup(self):
import logging
self.logger = logging
import csv
self.csv = csv
# self.convert_to_datapoint is available globally
# self.MyRecord is available globally
def convert_to_datapoint(self, row: list, obj: type) ->
tuple[Optional[NamedTuple], Optional[str]]:
def get_dict_name_type_mapping(obj):
dict_name_type_mapping = {}
for column_name, column_typing in zip(obj._fields,
obj.__annotations__.values()):
typing_args = get_args(column_typing)
if len(typing_args) == 2:
dict_name_type_mapping[column_name] = [t for t in
typing_args if t is not type(None)][0]
else:
dict_name_type_mapping[column_name] = column_typing
return dict_name_type_mapping
dict_name_value_mapping = dict(zip(obj._fields, row))
dict_name_type_mapping = get_dict_name_type_mapping(obj)
converted_dict = {}
imputation_values_dict = {}
imputation_values_dict[int] = -1
imputation_values_dict[float] = -1.0
imputation_values_dict[str] = "missing"
imputation_values_dict[datetime.datetime] =
datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc)
for feature_name, feature_value, feature_type in
zip(dict_name_value_mapping.keys(), row, dict_name_type_mapping.values()):
try:
if feature_value in imputation_values_dict.values():
# We have a feature whose value is something we use to
indicate the absence of a value
raise ValueError(f"Feature {feature_name} has a value
{feature_value} which we use to indicate absence of value.")
converted_dict[feature_name] = feature_type(feature_value)
except Exception:
if feature_type == datetime.datetime:
formats_to_try = ['%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d
%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f
%Z', '%Y-%m-%d %H:%M:%S %Z']
for fmt in formats_to_try:
try:
dt_obj =
datetime.datetime.strptime(feature_value, fmt)
converted_dict[feature_name] =
dt_obj.replace(tzinfo=datetime.timezone.utc) # Assume UTC if naive
except ValueError:
continue
else:
converted_dict[feature_name] =
imputation_values_dict[feature_type]
try:
return obj(**converted_dict)
except TypeError as e:
self.logger.error(f"Class {__class__.__name__}: Final
instantiation of {obj.__name__} failed with data {converted_dict}: {e}")
return None
def process(self, element: str):
# Use csv module for more robust parsing than simple split
try:
# Wrap element in a list because csv.reader expects an iterable
of
# lines
reader = self.csv.reader(
[element],
quotechar='"',
delimiter=",",
quoting=self.csv.QUOTE_ALL,
skipinitialspace=True)
row_raw = next(reader) # Get the first (and only) row
except StopIteration:
self.logger.warning(
f"Class {__class__.__name__}: CSV reader failed for element:
'{element}'. Skipping.")
yield
except Exception as e:
self.logger.error(
f"Class {__class__.__name__}: CSV parsing error for element
'{element[:100]}...': {e}")
yield # Skip rows with parsing errors
if row_raw:
try:
datapoint = self.convert_to_datapoint(row_raw, self.obj)
yield datapoint
except Exception as e:
self.logger.error(
f"Class {__class__.__name__}: Failed creating MyRecord
from {datapoint} with exception {e}")
options = {
'runner': 'PrismRunner', # Toggle with to DirectRunner/PrismRunner to
avoid/create bug, comment out direct_num_workers and direct_running_mode as not
applicable
# 'direct_num_workers': 0, # 0 often defaults to thread-based or single
process
# 'direct_running_mode': 'multi_processing', # Or 'multi_threading'
# 'semi_persistent_directory': tempfile.mkdtemp(), # Optional
# 'profile_cpu': True,
# 'profile_memory': True,
# 'profile_location': "./beam_profile"
}
opts = PipelineOptions(flags=[], **options)
output_base_path = "./prism"
logger = logging
def process_and_write(pcoll, split_name, output_path):
split_output_dir = os.path.join(output_path, f"Split-{split_name}")
try:
if os.path.exists(split_output_dir):
shutil.rmtree(split_output_dir)
os.makedirs(split_output_dir)
logger.info(f"Prepared output directory {split_output_dir}")
except Exception as e:
logger.warning(f"Could not manage directory {split_output_dir}: {e}")
# This transform now has three outputs: main (TFRecord), sequences_data,
and mapping_data
processed_outputs = (
pcoll
| f"Extract_{split_name}{output_path}_Sequences" >>
beam.MapTuple(lambda seq_id, steps: list(steps))
)
with beam.Pipeline(options=opts) as p:
parsed_timestamped_pcollection = (
p
| "ReadCSV" >> beam.io.ReadFromText(INPUT_FILE_NO_HEADER,
skip_header_lines=1)
| "ParseCSV" >>
beam.ParDo(ConvertToMyRecordBasic(obj=MyRecord).with_output_types(MyRecord))
| "AddEventTimestamps" >>
AddTimestamps(obj=MyRecord).with_output_types(MyRecord)
# | beam.combiners.Count.Globally() | beam.LogElements()
)
list_of_fixed_windows = []
for offset in range(0, 60, 5):
temp_windowed_grouped_sequences = (
parsed_timestamped_pcollection
# | "ApplySlidingWindow" >> beam.WindowInto(
# window.SlidingWindows(WINDOW_SIZE_SECONDS,
WINDOW_PERIOD_SECONDS))
| f"ApplyFixedWindowOffset{str(offset)}" >>
beam.WindowInto(window.FixedWindows(WINDOW_SIZE_SECONDS, offset=offset))
| f"ApplyMap{str(offset)}" >> beam.Map(lambda element,
w=beam.DoFn.WindowParam: (int(w.start), element))
# | f"ApplyGroupByKey{str(offset)}" >>
beam.GroupByKey().with_output_types(tuple[int, list[MyRecord]])
# |
beam.CombinePerKey(beam.combiners.ToListCombineFn()).with_hot_key_fanout(4)
#does not work with sliding windows
| f"ApplyCombine{offset}" >>
beam.CombinePerKey(beam.combiners.ToListCombineFn())
| f"ResampleAndKey{offset}" >>
beam.ParDo(ResampleWindowedDataAndAssignStartKey(
sampling_frequency=OUTPUT_FREQUENCY_SECONDS,
target_steps=TARGET_SEQUENCE_STEPS,
obj=MyRecord,
)).with_output_types(tuple[int, list[MyRecord]])
)
list_of_fixed_windows.append(temp_windowed_grouped_sequences)
windowed_grouped_sequences = list_of_fixed_windows | beam.Flatten()
data_split = (
windowed_grouped_sequences
# Pass the side input to ParDo. It will be an extra argument to
PartitionDoFn.process
| "PartitionData" >> beam.ParDo(
PartitionDoFn(fixed_threshold=1459503446),
).with_outputs('train', 'eval')
)
train_pcoll = data_split.train
eval_pcoll = data_split.eval
train_output = process_and_write(train_pcoll, "train", output_base_path)
eval_output = process_and_write(eval_pcoll, "eval", output_base_path)
```
```
# MyRecordSchema.py
from typing import NamedTuple, Optional
import datetime
class MyRecord(NamedTuple):
timestamp: datetime.datetime
rybq: int
qjmw: int
ksel: int
eqqq: int
xrus: int
biej: int
gftv: int
qcpd: Optional[int]
hnox: Optional[int]
fthm: float
xkqg: Optional[float]
fuil: float
iusu: Optional[float]
sczf: int
atlf: int
nedy: int
kxyq: float
gmla: float
ekld: Optional[float]
hwve: float
tklu: Optional[float]
pzoa: float
qmkg: float
pazj: float
qdym: float
xyio: float
vxtg: float
nzpk: float
ypvj: float
mohz: int
vpfz: int
efoo: float
aggn: float
wlwb: int
bjqq: float
wzbq: float
juks: float
dvdl: float
avit: int
nrha: int
lejm: int
kwyk: int
prqy: Optional[int]
putr: Optional[int]
cpwn: Optional[int]
dwlo: Optional[int]
vhol: int
wrmw: int
gnmk: int
neex: int
ylgu: int
kvyd: int
ywge: int
iuvv: int
kaij: Optional[int]
nbgs: Optional[int]
llxf: Optional[int]
mwie: Optional[int]
```
The error I get is:
```
Exception in thread wait_until_finish_read:
Traceback (most recent call last):
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line
1016, in _bootstrap_inner
Traceback (most recent call last):
File "/home/pritamdodeja/demo/template/debug_prism.py", line 554, in
<module>
self.run()
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line
953, in run
with beam.Pipeline(options=opts) as p:
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 663, in __exit__
self._target(*self._args, **self._kwargs)
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 533, in read_messages
self.result.wait_until_finish()
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 568, in wait_until_finish
for message in self._message_stream:
raise self._runtime_exception
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
line 543, in __next__
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 574, in _observe_state
for state_response in self._state_stream:
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
line 543, in __next__
return self._next()
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
line 969, in _next
return self._next()
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "UNKNOWN:Error received from peer
{created_time:"2025-08-20T21:42:49.149954627+02:00", grpc_status:4,
grpc_message:"Deadline Exceeded"}"
>
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "UNKNOWN:Error received from peer
{created_time:"2025-08-20T21:42:49.150497414+02:00", grpc_status:4,
grpc_message:"Deadline Exceeded"}"
>
Exception in thread
run_worker_job-001[job]_ref_Environment_default_environment_1:
Traceback (most recent call last):
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line
1016, in _bootstrap_inner
self.run()
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line
953, in run
self._target(*self._args, **self._kwargs)
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 283, in run
getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 343, in _request_process_bundle_progress
self._request_process_bundle_action(request)
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 352, in _request_process_bundle_action
self._report_progress_executor.submit(task)
File
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/concurrent/futures/thread.py",
line 169, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]