[ 
https://issues.apache.org/jira/browse/BEAM-5315?focusedWorklogId=152895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152895
 ]

ASF GitHub Bot logged work on BEAM-5315:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 19:58
            Start Date: 09/Oct/18 19:58
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6590: [BEAM-5315] Partially 
port io
URL: https://github.com/apache/beam/pull/6590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py 
b/sdks/python/apache_beam/io/filebasedsink_test.py
index 55e5a16dff8..39e2b8377fd 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -76,6 +76,10 @@ def _create_temp_file(self, name='', suffix=''):
 
 class MyFileBasedSink(filebasedsink.FileBasedSink):
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3.'
+                   'TODO: BEAM-5627, TODO:5618')
   def open(self, temp_path):
     # TODO: Fix main session pickling.
     # file_handle = super(MyFileBasedSink, self).open(temp_path)
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py 
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 0e57a04356d..bedbf46c9e4 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -52,6 +52,10 @@
 
 class LineSource(FileBasedSource):
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def read_records(self, file_name, range_tracker):
     f = self.open_file(file_name)
     try:
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index 876ba7a38b8..c3ab88cb386 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -26,6 +26,7 @@
 import ntpath
 import os
 import posixpath
+import sys
 import tempfile
 import unittest
 from builtins import range
@@ -287,6 +288,10 @@ def _create_temp_file(self):
     self._tempfiles.append(path)
     return path
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def _create_compressed_file(self, compression_type, content):
     file_name = self._create_temp_file()
 
@@ -427,6 +432,10 @@ def test_read_and_seek_back_to_beginning(self):
 
         self.assertEqual(first_pass, second_pass)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_tell(self):
     lines = ['line%d\n' % i for i in range(10)]
     tmpfile = self._create_temp_file()
diff --git a/sdks/python/apache_beam/io/filesystemio.py 
b/sdks/python/apache_beam/io/filesystemio.py
index 086ae164aef..161f50f9331 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -214,7 +214,7 @@ def __init__(self, recv_pipe):
     self.conn = recv_pipe
     self.closed = False
     self.position = 0
-    self.remaining = ''
+    self.remaining = b''
 
   def read(self, size):
     """Read data from the wrapped pipe connection.
@@ -239,7 +239,7 @@ def read(self, size):
           self.remaining = self.conn.recv_bytes()
         except EOFError:
           break
-    return ''.join(data_list)
+    return b''.join(data_list)
 
   def tell(self):
     """Tell the file's current offset.
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py 
b/sdks/python/apache_beam/io/filesystemio_test.py
index 75079a539c4..6db78e7d2cc 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -22,6 +22,7 @@
 import logging
 import multiprocessing
 import os
+import sys
 import threading
 import unittest
 from builtins import range
@@ -74,9 +75,9 @@ def test_file_attributes(self):
     self.assertTrue(stream.seekable())
 
   def test_read_empty(self):
-    downloader = FakeDownloader(data='')
+    downloader = FakeDownloader(data=b'')
     stream = filesystemio.DownloaderStream(downloader)
-    self.assertEqual(stream.read(), '')
+    self.assertEqual(stream.read(), b'')
 
   def test_read(self):
     data = 'abcde'
@@ -89,6 +90,10 @@ def test_read(self):
     self.assertEqual(stream.read(), data[1:])
     self.assertEqual(downloader.last_read_size, len(data) - 1)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_buffered(self):
     data = 'abcde'
     downloader = FakeDownloader(data)
@@ -102,6 +107,10 @@ def test_read_buffered(self):
     self.assertEqual(stream.read(), data[1:])
 
 
+@unittest.skipIf(sys.version_info[0] == 3 and
+                 os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                 'This test still needs to be fixed on Python 3'
+                 'TODO: BEAM-5627')
 class TestUploaderStream(unittest.TestCase):
 
   def test_file_attributes(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 3e3f51762f4..64872532db3 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -19,6 +19,7 @@
 from __future__ import absolute_import
 
 import errno
+import os
 import random
 import sys
 import unittest
@@ -28,9 +29,12 @@
 from mock import MagicMock
 
 # pylint: disable=ungrouped-imports
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.testing.test_utils import patch_retry
+try: # TODO(BEAM-4543): googledatastore dependency does not work on Python 3.
+  from apache_beam.io.gcp.datastore.v1 import fake_datastore
+  from apache_beam.io.gcp.datastore.v1 import helper
+  from apache_beam.testing.test_utils import patch_retry
+except ImportError:
+  pass
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -48,6 +52,10 @@
 # pylint: enable=ungrouped-imports
 
 
+@unittest.skipIf(sys.version_info[0] == 3 and
+                 os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                 'This test still needs to be fixed on Python 3'
+                 'TODO: BEAM-4543')
 @unittest.skipIf(datastore_helper is None, 'GCP dependencies are not 
installed')
 class HelperTest(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index c5bfdd81cf8..617d317c788 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -19,13 +19,19 @@
 
 from __future__ import absolute_import
 
+import os
+import sys
 import unittest
 
 from mock import MagicMock
 from mock import call
 
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import query_splitter
+# pylint: disable=ungrouped-imports
+try: # TODO(BEAM-4543): googledatastore dependency does not work on Python 3.
+  from apache_beam.io.gcp.datastore.v1 import fake_datastore
+  from apache_beam.io.gcp.datastore.v1 import query_splitter
+except ImportError:
+  pass
 
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -36,8 +42,13 @@
 except ImportError:
   datastore_pb2 = None
 # pylint: enable=wrong-import-order, wrong-import-position
+# pylint: enable=ungrouped-imports
 
 
+@unittest.skipIf(sys.version_info[0] == 3 and
+                 os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                 'This test still needs to be fixed on Python 3'
+                 'TODO: BEAM-4543')
 @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
 class QuerySplitterTest(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index e0af7c962b5..10789d00e22 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -23,6 +23,7 @@
 import logging
 import os
 import random
+import sys
 import unittest
 from builtins import object
 from builtins import range
@@ -117,11 +118,15 @@ def get_range_callback(start, end):
 
       download.GetRange = get_range_callback
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def Insert(self, insert_request, upload=None):  # pylint: 
disable=invalid-name
     assert upload is not None
     generation = self.get_last_generation(insert_request.bucket,
                                           insert_request.name) + 1
-    f = FakeFile(insert_request.bucket, insert_request.name, '', generation)
+    f = FakeFile(insert_request.bucket, insert_request.name, b'', generation)
 
     # Stream data into file.
     stream = upload.stream
@@ -131,7 +136,7 @@ def Insert(self, insert_request, upload=None):  # pylint: 
disable=invalid-name
       if not data:
         break
       data_list.append(data)
-    f.contents = ''.join(data_list)
+    f.contents = b''.join(data_list)
 
     self.add_file(f)
 
@@ -484,7 +489,7 @@ def test_full_file_read(self):
     self.assertEqual(f.mode, 'r')
     f.seek(0, os.SEEK_END)
     self.assertEqual(f.tell(), file_size)
-    self.assertEqual(f.read(), '')
+    self.assertEqual(f.read(), b'')
     f.seek(0)
     self.assertEqual(f.read(), random_file.contents)
 
@@ -505,6 +510,10 @@ def test_file_random_seek(self):
           f.read(end - start + 1), random_file.contents[start:end + 1])
       self.assertEqual(f.tell(), end + 1)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_iterator(self):
     file_name = 'gs://gcsio-test/iterating_file'
     lines = []
@@ -526,6 +535,10 @@ def test_file_iterator(self):
 
     self.assertEqual(read_lines, line_count)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_read_line(self):
     file_name = 'gs://gcsio-test/read_line_file'
     lines = []
@@ -578,6 +591,10 @@ def test_file_read_line(self):
       f.seek(start)
       self.assertEqual(f.readline(), lines[line_index][chars_left:])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_write(self):
     file_name = 'gs://gcsio-test/write_file'
     file_size = 5 * 1024 * 1024 + 2000
@@ -592,6 +609,10 @@ def test_file_write(self):
     self.assertEqual(
         self.client.objects.get_file(bucket, name).contents, contents)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_close(self):
     file_name = 'gs://gcsio-test/close_file'
     file_size = 5 * 1024 * 1024 + 2000
@@ -605,6 +626,10 @@ def test_file_close(self):
     self.assertEqual(
         self.client.objects.get_file(bucket, name).contents, contents)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_flush(self):
     file_name = 'gs://gcsio-test/flush_file'
     file_size = 5 * 1024 * 1024 + 2000
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py 
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index 8421c43e629..3dd94b7ed8c 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -21,6 +21,7 @@
 
 import io
 import logging
+import os
 import posixpath
 import sys
 import unittest
@@ -321,6 +322,10 @@ def test_create_success(self):
     expected_file = FakeFile(url, 'wb')
     self.assertEqual(self._fake_hdfs.files[url], expected_file)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_create_write_read_compressed(self):
     url = self.fs.join(self.tmpdir, 'new_file.gz')
 
@@ -358,6 +363,10 @@ def _cmpfiles(self, url1, url2):
         data2 = f2.read()
         return data1 == data2
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_copy_file(self):
     url1 = self.fs.join(self.tmpdir, 'new_file1')
     url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -368,6 +377,10 @@ def test_copy_file(self):
     self.assertTrue(self._cmpfiles(url1, url2))
     self.assertTrue(self._cmpfiles(url1, url3))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_copy_file_overwrite_error(self):
     url1 = self.fs.join(self.tmpdir, 'new_file1')
     url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -379,6 +392,10 @@ def test_copy_file_overwrite_error(self):
         BeamIOError, r'already exists.*%s' % posixpath.basename(url2)):
       self.fs.copy([url1], [url2])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_copy_file_error(self):
     url1 = self.fs.join(self.tmpdir, 'new_file1')
     url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -392,6 +409,10 @@ def test_copy_file_error(self):
       self.fs.copy([url1, url3], [url2, url4])
     self.assertTrue(self._cmpfiles(url3, url4))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_copy_directory(self):
     url_t1 = self.fs.join(self.tmpdir, 't1')
     url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
@@ -409,6 +430,10 @@ def test_copy_directory(self):
     self.fs.copy([url_t1], [url_t2])
     self.assertTrue(self._cmpfiles(url1, url2))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_copy_directory_overwrite_error(self):
     url_t1 = self.fs.join(self.tmpdir, 't1')
     url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
@@ -433,6 +458,10 @@ def test_copy_directory_overwrite_error(self):
     with self.assertRaisesRegexp(BeamIOError, r'already exists'):
       self.fs.copy([url_t1], [url_t2])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_rename_file(self):
     url1 = self.fs.join(self.tmpdir, 'f1')
     url2 = self.fs.join(self.tmpdir, 'f2')
@@ -443,6 +472,10 @@ def test_rename_file(self):
     self.assertFalse(self.fs.exists(url1))
     self.assertTrue(self.fs.exists(url2))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_rename_file_error(self):
     url1 = self.fs.join(self.tmpdir, 'f1')
     url2 = self.fs.join(self.tmpdir, 'f2')
@@ -457,6 +490,10 @@ def test_rename_file_error(self):
     self.assertFalse(self.fs.exists(url3))
     self.assertTrue(self.fs.exists(url4))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_rename_directory(self):
     url_t1 = self.fs.join(self.tmpdir, 't1')
     url_t2 = self.fs.join(self.tmpdir, 't2')
@@ -478,12 +515,20 @@ def test_exists(self):
     self.assertTrue(self.fs.exists(url1))
     self.assertFalse(self.fs.exists(url2))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_size(self):
     url = self.fs.join(self.tmpdir, 'f1')
     with self.fs.create(url) as f:
       f.write(b'Hello')
     self.assertEqual(5, self.fs.size(url))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_checksum(self):
     url = self.fs.join(self.tmpdir, 'f1')
     with self.fs.create(url) as f:
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py 
b/sdks/python/apache_beam/io/range_trackers_test.py
index 475510e5d1f..de578b8a032 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -22,6 +22,8 @@
 import copy
 import logging
 import math
+import os
+import sys
 import unittest
 
 from past.builtins import long
@@ -324,17 +326,32 @@ def _check(self, fraction=None, key=None, start=None, 
end=None, delta=0):
       self.assertEqual(computed_fraction, fraction, str(locals()))
     self.assertEqual(computed_key, key, str(locals()))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_key_to_fraction_no_endpoints(self):
     self._check(key='\x07', fraction=7/256.)
     self._check(key='\xFF', fraction=255/256.)
     self._check(key='\x01\x02\x03', fraction=(2**16 + 2**9 + 3) / (2.0**24))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_key_to_fraction(self):
     self._check(key='\x87', start='\x80', fraction=7/128.)
     self._check(key='\x07', end='\x10', fraction=7/16.)
     self._check(key='\x47', start='\x40', end='\x80', fraction=7/64.)
     self._check(key='\x47\x80', start='\x40', end='\x80', fraction=15/128.)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_key_to_fraction_common_prefix(self):
     self._check(
         key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'c',
@@ -349,6 +366,11 @@ def test_key_to_fraction_common_prefix(self):
                 end='foob\x00\x00\x00\x00\x00\x00\x00\x00\x02',
                 fraction=0.5)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_tiny(self):
     self._check(fraction=.5**20, key='\0\0\x10')
     self._check(fraction=.5**20, start='a', end='b', key='a\0\0\x10')
@@ -363,6 +385,11 @@ def test_tiny(self):
                 delta=1e-15)
     self._check(fraction=.5**100, key='\0' * 12 + '\x10')
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_lots(self):
     for fraction in (0, 1, .5, .75, 7./512, 1 - 7./4096):
       self._check(fraction)
@@ -384,6 +411,11 @@ def test_lots(self):
       self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81',
                   delta=1e-14)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be '
+                   'fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_good_prec(self):
     # There should be about 7 characters (~53 bits) of precision
     # (beyond the common prefix of start and end).
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py 
b/sdks/python/apache_beam/io/source_test_utils_test.py
index 38a2e8eff74..1dd09c0f0cf 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -18,6 +18,7 @@
 from __future__ import absolute_import
 
 import logging
+import os
 import sys
 import tempfile
 import unittest
@@ -54,12 +55,20 @@ def _create_source(self, data):
     for bundle in source.split(float('inf')):
       return bundle.source
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_from_source(self):
     data = self._create_data(100)
     source = self._create_source(data)
     self.assertCountEqual(
         data, source_test_utils.read_from_source(source, None, None))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_source_equals_reference_source(self):
     data = self._create_data(100)
     reference_source = self._create_source(data)
@@ -73,6 +82,10 @@ def test_source_equals_reference_source(self):
     source_test_utils.assert_sources_equal_reference_source(
         (reference_source, None, None), sources_info)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_split_at_fraction_successful(self):
     data = self._create_data(100)
     source = self._create_source(data)
@@ -97,6 +110,10 @@ def test_split_at_fraction_successful(self):
     self.assertTrue(result1[0] < result3[0])
     self.assertTrue(result1[1] > result3[1])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_split_at_fraction_fails(self):
     data = self._create_data(100)
     source = self._create_source(data)
@@ -110,6 +127,10 @@ def test_split_at_fraction_fails(self):
       source_test_utils.assert_split_at_fraction_behavior(
           source, 10, 0.5, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_split_at_fraction_binary(self):
     data = self._create_data(100)
     source = self._create_source(data)
@@ -122,6 +143,10 @@ def test_split_at_fraction_binary(self):
     self.assertTrue(stats.successful_fractions)
     self.assertTrue(stats.non_trivial_fractions)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_split_at_fraction_exhaustive(self):
     data = self._create_data(10)
     source = self._create_source(data)
diff --git a/sdks/python/apache_beam/io/sources_test.py 
b/sdks/python/apache_beam/io/sources_test.py
index cba8922976e..1e7de57bd9c 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -95,6 +95,10 @@ def _create_temp_file(self, contents):
       f.write(contents)
       return f.name
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_from_source(self):
     file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
 
@@ -104,6 +108,10 @@ def test_read_from_source(self):
 
     self.assertCountEqual(['aaaa', 'bbbb', 'cccc', 'dddd'], result)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_run_direct(self):
     file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
     pipeline = TestPipeline()
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index adb96d1b8d1..1bc54eeca35 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -497,6 +497,10 @@ def test_read_corrupted_bzip2_fails(self):
       with self.assertRaises(Exception):
         pipeline.run()
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_bzip2_concat(self):
     with TempDir() as tempdir:
       bzip2_file_name1 = tempdir.create_temp_file()
@@ -819,6 +823,10 @@ def _write_lines(self, sink, lines):
       sink.write_record(f, line)
     sink.close(f)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_text_file(self):
     sink = TextSink(self.path)
     self._write_lines(sink, self.lines)
@@ -833,6 +841,10 @@ def test_write_text_file_empty(self):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_bzip2_file(self):
     sink = TextSink(
         self.path, compression_type=CompressionTypes.BZIP2)
@@ -841,6 +853,10 @@ def test_write_bzip2_file(self):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_bzip2_file_auto(self):
     self.path = self._create_temp_file(suffix='.bz2')
     sink = TextSink(self.path)
@@ -849,6 +865,10 @@ def test_write_bzip2_file_auto(self):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_gzip_file(self):
     sink = TextSink(
         self.path, compression_type=CompressionTypes.GZIP)
@@ -857,6 +877,10 @@ def test_write_gzip_file(self):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_gzip_file_auto(self):
     self.path = self._create_temp_file(suffix='.gz')
     sink = TextSink(self.path)
@@ -865,6 +889,10 @@ def test_write_gzip_file_auto(self):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_gzip_file_empty(self):
     sink = TextSink(
         self.path, compression_type=CompressionTypes.GZIP)
@@ -873,6 +901,10 @@ def test_write_gzip_file_empty(self):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_text_file_with_header(self):
     header = 'header1\nheader2'
     sink = TextSink(self.path, header=header)
@@ -881,6 +913,10 @@ def test_write_text_file_with_header(self):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), header.splitlines() + self.lines)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_text_file_empty_with_header(self):
     header = 'header1\nheader2'
     sink = TextSink(self.path, header=header)
@@ -889,6 +925,10 @@ def test_write_text_file_empty_with_header(self):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), header.splitlines())
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_write_dataflow(self):
     pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create(self.lines)
diff --git a/sdks/python/apache_beam/io/vcfio_test.py 
b/sdks/python/apache_beam/io/vcfio_test.py
index 25b5d0cfa09..22d23c64718 100644
--- a/sdks/python/apache_beam/io/vcfio_test.py
+++ b/sdks/python/apache_beam/io/vcfio_test.py
@@ -21,6 +21,7 @@
 
 import logging
 import os
+import sys
 import unittest
 from itertools import chain
 from itertools import permutations
@@ -287,6 +288,10 @@ def test_read_file_pattern_large(self):
         os.path.join(get_full_dir(), 'valid-*.vcf.gz'))
     self.assertEqual(9900, len(read_data_gz))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_single_file_no_records(self):
     self.assertEqual(
         [], self._create_temp_file_and_read_records(['']))
@@ -295,6 +300,10 @@ def test_single_file_no_records(self):
     self.assertEqual(
         [], self._create_temp_file_and_read_records(_SAMPLE_HEADER_LINES))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_single_file_verify_details(self):
     variant_1, vcf_line_1 = self._get_sample_variant_1()
     read_data = self._create_temp_file_and_read_records(
@@ -308,6 +317,10 @@ def test_single_file_verify_details(self):
     self.assertEqual(3, len(read_data))
     self._assert_variants_equal([variant_1, variant_2, variant_3], read_data)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_file_pattern_verify_details(self):
     variant_1, vcf_line_1 = self._get_sample_variant_1()
     variant_2, vcf_line_2 = self._get_sample_variant_2()
@@ -336,6 +349,10 @@ def test_read_after_splitting(self):
       split_records.extend(source_test_utils.read_from_source(*source_info))
     self.assertEqual(9882, len(split_records))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_invalid_file(self):
     invalid_file_contents = self._get_invalid_file_contents()
     for content in chain(*invalid_file_contents):
@@ -347,6 +364,10 @@ def test_invalid_file(self):
         self._create_temp_vcf_file(content, tempdir)
       self._read_records(os.path.join(tempdir.get_path(), '*.vcf'))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_allow_malformed_records(self):
     invalid_records, invalid_headers = self._get_invalid_file_contents()
 
@@ -365,6 +386,10 @@ def test_allow_malformed_records(self):
         self._read_records(self._create_temp_vcf_file(content, tempdir),
                            allow_malformed_records=True)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_no_samples(self):
     header_line = '#CHROM      POS     ID      REF     ALT     QUAL    FILTER  
INFO\n'
     record_line = '19  123     .       G       A       .       PASS    AF=0.2'
@@ -377,6 +402,10 @@ def test_no_samples(self):
     self.assertEqual(1, len(read_data))
     self.assertEqual(expected_variant, read_data[0])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_no_info(self):
     record_line = 'chr19       123     .       .       .       .       .       
.       GT      .       .'
     expected_variant = Variant(reference_name='chr19', start=122, end=123)
@@ -389,6 +418,10 @@ def test_no_info(self):
     self.assertEqual(1, len(read_data))
     self.assertEqual(expected_variant, read_data[0])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_info_numbers_and_types(self):
     info_headers = [
         '##INFO=<ID=HA,Number=A,Type=String,Description="StringInfo_A">\n',
@@ -422,6 +455,10 @@ def test_info_numbers_and_types(self):
     self.assertEqual(2, len(read_data))
     self._assert_variants_equal([variant_1, variant_2], read_data)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_end_info_key(self):
     phaseset_header_line = (
         '##INFO=<ID=END,Number=1,Type=Integer,Description="End of record.">\n')
@@ -440,6 +477,10 @@ def test_end_info_key(self):
     self.assertEqual(2, len(read_data))
     self._assert_variants_equal([variant_1, variant_2], read_data)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_custom_phaseset(self):
     phaseset_header_line = (
         '##FORMAT=<ID=PS,Number=1,Type=Integer,Description="Phaseset">\n')
@@ -463,6 +504,10 @@ def test_custom_phaseset(self):
     self.assertEqual(2, len(read_data))
     self._assert_variants_equal([variant_1, variant_2], read_data)
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_format_numbers(self):
     format_headers = [
         
'##FORMAT=<ID=FU,Number=.,Type=String,Description="Format_variable">\n',
@@ -486,6 +531,10 @@ def test_format_numbers(self):
     self.assertEqual(1, len(read_data))
     self.assertEqual(expected_variant, read_data[0])
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_pipeline_read_single_file(self):
     with TempDir() as tempdir:
       file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -511,6 +560,10 @@ def test_pipeline_read_file_pattern_large(self):
     assert_that(pcoll, _count_equals_to(9900))
     pipeline.run()
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_reentrant_without_splitting(self):
     with TempDir() as tempdir:
       file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -518,6 +571,10 @@ def test_read_reentrant_without_splitting(self):
       source = VcfSource(file_name)
       source_test_utils.assert_reentrant_reads_succeed((source, None, None))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_read_reentrant_after_splitting(self):
     with TempDir() as tempdir:
       file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -528,6 +585,10 @@ def test_read_reentrant_after_splitting(self):
       source_test_utils.assert_reentrant_reads_succeed(
           (splits[0].source, splits[0].start_position, 
splits[0].stop_position))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test still needs to be fixed on Python 3'
+                   'TODO: BEAM-5627')
   def test_dynamic_work_rebalancing(self):
     with TempDir() as tempdir:
       file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ce94945a23e..a3db7903fca 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -140,7 +140,7 @@ def get_version():
     # oauth2client >=4 only works with google-apitools>=0.5.18.
     'google-apitools>=0.5.18,<=0.5.20',
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
-    'googledatastore==7.0.1',
+    'googledatastore==7.0.1; python_version < "3.0"',
     'google-cloud-pubsub==0.35.4',
     # GCP packages required by tests
     'google-cloud-bigquery==0.25.0',
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index bf9175f972b..5ed55e9635e 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -56,8 +56,9 @@ commands =
 [testenv:py3]
 setenv =
   BEAM_EXPERIMENTAL_PY3=1
+  RUN_SKIPPED_PY3_TESTS=0
 modules =
-  
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners
+  
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.gcsfilesystem_test,apache_beam.io.gcp.gcsio_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.internal,apache_beam.io.filesystem_test,apache_beam.io.filesystems,apache_beam.io.range_trackers_test,apache_beam.io.sources_test
 commands =
   python --version
   pip --version


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152895)
    Time Spent: 4h  (was: 3h 50m)

> Finish Python 3 porting for io module
> -------------------------------------
>
>                 Key: BEAM-5315
>                 URL: https://issues.apache.org/jira/browse/BEAM-5315
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Robbe
>            Assignee: Simon
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to