Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21477#discussion_r193286932
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
             finally:
                 q.stop()
                 shutil.rmtree(tmpPath)
    +    '''
     
    +    class ForeachWriterTester:
    +
    +        def __init__(self, spark):
    +            self.spark = spark
    +            self.input_dir = tempfile.mkdtemp()
    +            self.open_events_dir = tempfile.mkdtemp()
    +            self.process_events_dir = tempfile.mkdtemp()
    +            self.close_events_dir = tempfile.mkdtemp()
    +
    +        def write_open_event(self, partitionId, epochId):
    +            self._write_event(
    +                self.open_events_dir,
    +                {'partition': partitionId, 'epoch': epochId})
    +
    +        def write_process_event(self, row):
    +            self._write_event(self.process_events_dir, {'value': 'text'})
    +
    +        def write_close_event(self, error):
    +            self._write_event(self.close_events_dir, {'error': str(error)})
    +
    +        def write_input_file(self):
    +            self._write_event(self.input_dir, "text")
    +
    +        def open_events(self):
    +            return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
    +
    +        def process_events(self):
    +            return self._read_events(self.process_events_dir, 'value 
STRING')
    +
    +        def close_events(self):
    +            return self._read_events(self.close_events_dir, 'error STRING')
    +
    +        def run_streaming_query_on_writer(self, writer, num_files):
    +            try:
    +                sdf = 
self.spark.readStream.format('text').load(self.input_dir)
    +                sq = sdf.writeStream.foreach(writer).start()
    +                for i in range(num_files):
    +                    self.write_input_file()
    +                    sq.processAllAvailable()
    +                sq.stop()
    +            finally:
    +                self.stop_all()
    +
    +        def _read_events(self, dir, json):
    +            rows = self.spark.read.schema(json).json(dir).collect()
    +            dicts = [row.asDict() for row in rows]
    +            return dicts
    +
    +        def _write_event(self, dir, event):
    +            import random
    +            file = open(os.path.join(dir, str(random.randint(0, 100000))), 
'w')
    --- End diff --
    
    We might feel more convenient with `with` statement, and renaming `file` to 
`f` or `fw` or so. Please ignore if there's specific reason not to use `with` 
statement.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to