[ 
https://issues.apache.org/jira/browse/BEAM-4444?focusedWorklogId=162688&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162688
 ]

ASF GitHub Bot logged work on BEAM-4444:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/18 18:51
            Start Date: 05/Nov/18 18:51
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#6763: [BEAM-4444] Parquet IO for Python SDK
URL: https://github.com/apache/beam/pull/6763#discussion_r230867152
 
 

 ##########
 File path: sdks/python/apache_beam/io/parquetio_test.py
 ##########
 @@ -0,0 +1,376 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import sys
+import tempfile
+import unittest
+
+import hamcrest as hc
+import pyarrow as pa
+import pyarrow.parquet as pq
+
+from apache_beam import Create
+from apache_beam import Map
+from apache_beam.io import filebasedsource
+from apache_beam.io import source_test_utils
+from apache_beam.io.iobase import RangeTracker
+from apache_beam.io.parquetio import ReadAllFromParquet
+from apache_beam.io.parquetio import ReadFromParquet
+from apache_beam.io.parquetio import WriteToParquet
+from apache_beam.io.parquetio import _create_parquet_sink
+from apache_beam.io.parquetio import _create_parquet_source
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+
+
+class TestParquet(unittest.TestCase):
+  _temp_files = []
+
+  @classmethod
+  def setUpClass(cls):
+    # Method has been renamed in Python 3
+    if sys.version_info[0] < 3:
+      cls.assertCountEqual = cls.assertItemsEqual
+
+  def setUp(self):
+    # Reducing the size of thread pools. Without this test execution may fail 
in
+    # environments with limited amount of resources.
+    filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2
+
+  def tearDown(self):
+    for path in self._temp_files:
+      if os.path.exists(path):
+        os.remove(path)
+        parent = os.path.dirname(path)
+        if not os.listdir(parent):
+          os.rmdir(parent)
+    self._temp_files = []
+
+  RECORDS = [{'name': 'Thomas',
+              'favorite_number': 1,
+              'favorite_color': 'blue'}, {'name': 'Henry',
+                                          'favorite_number': 3,
+                                          'favorite_color': 'green'},
+             {'name': 'Toby',
+              'favorite_number': 7,
+              'favorite_color': 'brown'}, {'name': 'Gordon',
+                                           'favorite_number': 4,
+                                           'favorite_color': 'blue'},
+             {'name': 'Emily',
+              'favorite_number': -1,
+              'favorite_color': 'Red'}, {'name': 'Percy',
+                                         'favorite_number': 6,
+                                         'favorite_color': 'Green'}]
+
+  SCHEMA = pa.schema([
+      ('name', pa.binary()),
+      ('favorite_number', pa.int64()),
+      ('favorite_color', pa.binary())
+  ])
+
+  def _record_to_columns(self, records, schema):
+    col_list = []
+    for n in schema.names:
+      column = []
+      for r in records:
+        column.append(r[n])
+      col_list.append(column)
+    return col_list
+
+  def _write_data(self,
+                  directory=None,
+                  prefix=tempfile.template,
+                  row_group_size=1000,
+                  codec='none',
+                  count=len(RECORDS)):
+
+    with tempfile.NamedTemporaryFile(
+        delete=False, dir=directory, prefix=prefix) as f:
+      len_records = len(self.RECORDS)
+      data = []
+      for i in range(count):
+        data.append(self.RECORDS[i % len_records])
+      col_data = self._record_to_columns(data, self.SCHEMA)
+      col_array = [pa.array(c) for c in col_data]
+      table = pa.Table.from_arrays(col_array, self.SCHEMA.names)
+      pq.write_table(table, f, row_group_size=row_group_size, 
compression=codec)
+
+      self._temp_files.append(f.name)
+      return f.name
+
+  def _write_pattern(self, num_files):
+    assert num_files > 0
+    temp_dir = tempfile.mkdtemp()
+
+    file_name = None
+    for _ in range(num_files):
+      file_name = self._write_data(directory=temp_dir, prefix='mytemp')
+
+    assert file_name
+    file_name_prefix = file_name[:file_name.rfind(os.path.sep)]
+    return file_name_prefix + os.path.sep + 'mytemp*'
+
+  def _run_parquet_test(self, pattern, columns, desired_bundle_size,
+                        perform_splitting, expected_result):
+    source = _create_parquet_source(pattern, columns=columns)
+    if perform_splitting:
+      assert desired_bundle_size
+      sources_info = [
+          (split.source, split.start_position, split.stop_position)
+          for split in source.split(desired_bundle_size=desired_bundle_size)
+      ]
+      if len(sources_info) < 2:
+        raise ValueError('Test is trivial. Please adjust it so that at least '
+                         'two splits get generated')
+
+      source_test_utils.assert_sources_equal_reference_source(
+          (source, None, None), sources_info)
+    else:
+      read_records = source_test_utils.read_from_source(source, None, None)
+      self.assertCountEqual(expected_result, read_records)
+
+  def test_read_without_splitting(self):
+    file_name = self._write_data()
+    expected_result = self.RECORDS
+    self._run_parquet_test(file_name, None, None, False, expected_result)
+
+  def test_read_with_splitting(self):
+    file_name = self._write_data()
+    expected_result = self.RECORDS
+    self._run_parquet_test(file_name, None, 100, True, expected_result)
+
+  def test_source_display_data(self):
+    file_name = 'some_parquet_source'
+    source = \
+        _create_parquet_source(
+            file_name,
+            validate=False
+        )
+    dd = DisplayData.create_from(source)
+
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('file_pattern', file_name)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_read_display_data(self):
+    file_name = 'some_parquet_source'
+    read = \
+      ReadFromParquet(
+          file_name,
+          validate=False)
+    dd = DisplayData.create_from(read)
+
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('file_pattern', file_name)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_sink_display_data(self):
+    file_name = 'some_parquet_sink'
+    sink = _create_parquet_sink(
+        file_name,
+        self.SCHEMA,
+        'none',
+        1,
+        '.end',
+        0,
+        None,
+        'application/x-parquet')
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'schema',
+            str(self.SCHEMA)),
+        DisplayDataItemMatcher(
+            'file_pattern',
+            'some_parquet_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
+        DisplayDataItemMatcher(
+            'codec',
+            'none'),
+        DisplayDataItemMatcher(
+            'compression',
+            'uncompressed')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_write_display_data(self):
+    file_name = 'some_parquet_sink'
+    write = WriteToParquet(file_name, self.SCHEMA)
+    dd = DisplayData.create_from(write)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'schema',
+            str(self.SCHEMA)),
+        DisplayDataItemMatcher(
+            'file_pattern',
+            'some_parquet_sink-%(shard_num)05d-of-%(num_shards)05d'),
+        DisplayDataItemMatcher(
+            'codec',
+            'none'),
+        DisplayDataItemMatcher(
+            'compression',
+            'uncompressed')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_sink_transform(self):
+    with tempfile.NamedTemporaryFile() as dst:
+      path = dst.name
+      with TestPipeline() as p:
+        # pylint: disable=expression-not-assigned
+        p \
+        | Create(self.RECORDS) \
+        | WriteToParquet(
+            path, self.SCHEMA, num_shards=1, shard_name_template='')
+      with TestPipeline() as p:
+        # json used for stable sortability
+        readback = \
+            p \
+            | ReadFromParquet(path) \
+            | Map(json.dumps)
+        assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+
+  def test_sink_transform_snappy(self):
 
 Review comment:
   Please add tests for all supported compression types for read and write.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 162688)
    Time Spent: 4h 50m  (was: 4h 40m)

> Parquet IO for Python SDK
> -------------------------
>
>                 Key: BEAM-4444
>                 URL: https://issues.apache.org/jira/browse/BEAM-4444
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Bruce Arctor
>            Assignee: Heejong Lee
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Add Parquet Support for the Python SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to