Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193286932 --- Diff: python/pyspark/sql/tests.py --- @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self): finally: q.stop() shutil.rmtree(tmpPath) + ''' + class ForeachWriterTester: + + def __init__(self, spark): + self.spark = spark + self.input_dir = tempfile.mkdtemp() + self.open_events_dir = tempfile.mkdtemp() + self.process_events_dir = tempfile.mkdtemp() + self.close_events_dir = tempfile.mkdtemp() + + def write_open_event(self, partitionId, epochId): + self._write_event( + self.open_events_dir, + {'partition': partitionId, 'epoch': epochId}) + + def write_process_event(self, row): + self._write_event(self.process_events_dir, {'value': 'text'}) + + def write_close_event(self, error): + self._write_event(self.close_events_dir, {'error': str(error)}) + + def write_input_file(self): + self._write_event(self.input_dir, "text") + + def open_events(self): + return self._read_events(self.open_events_dir, 'partition INT, epoch INT') + + def process_events(self): + return self._read_events(self.process_events_dir, 'value STRING') + + def close_events(self): + return self._read_events(self.close_events_dir, 'error STRING') + + def run_streaming_query_on_writer(self, writer, num_files): + try: + sdf = self.spark.readStream.format('text').load(self.input_dir) + sq = sdf.writeStream.foreach(writer).start() + for i in range(num_files): + self.write_input_file() + sq.processAllAvailable() + sq.stop() + finally: + self.stop_all() + + def _read_events(self, dir, json): + rows = self.spark.read.schema(json).json(dir).collect() + dicts = [row.asDict() for row in rows] + return dicts + + def _write_event(self, dir, event): + import random + file = open(os.path.join(dir, str(random.randint(0, 100000))), 'w') --- End diff -- We might feel more convenient with `with` statement, and renaming `file` to `f` or `fw` or so. Please ignore if there's specific reason not to use `with` statement.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org