[ https://issues.apache.org/jira/browse/BEAM-8561?focusedWorklogId=385489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385489 ]
ASF GitHub Bot logged work on BEAM-8561: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Feb/20 21:59 Start Date: 11/Feb/20 21:59 Worklog Time Spent: 10m Work Description: chrlarsen commented on pull request #10290: [BEAM-8561] Add ThriftIO to support IO for Thrift files URL: https://github.com/apache/beam/pull/10290#discussion_r377925387 ########## File path: sdks/java/io/thrift/src/test/java/org/apache/beam/sdk/io/thrift/ThriftIOTest.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.thrift; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomUtils; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ThriftIO}. */ +@RunWith(JUnit4.class) +public class ThriftIOTest implements Serializable { + + private static final String RESOURCE_DIR = "ThriftIOTest/"; + + private static final String THRIFT_DIR = Resources.getResource(RESOURCE_DIR).getPath(); + private static final String ALL_THRIFT_STRING = + Resources.getResource(RESOURCE_DIR).getPath() + "*"; + private static final TestThriftStruct TEST_THRIFT_STRUCT = new TestThriftStruct(); + private static List<TestThriftStruct> testThriftStructs; + private final TProtocolFactory tBinaryProtoFactory = new TBinaryProtocol.Factory(); + private final TProtocolFactory tJsonProtocolFactory = new TJSONProtocol.Factory(); + private final TProtocolFactory tSimpleJsonProtocolFactory = new TSimpleJSONProtocol.Factory(); + private final TProtocolFactory tCompactProtocolFactory = new TCompactProtocol.Factory(); + @Rule public transient TestPipeline mainPipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + byte[] bytes = new byte[10]; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + TEST_THRIFT_STRUCT.testByte = 100; + TEST_THRIFT_STRUCT.testShort = 200; + TEST_THRIFT_STRUCT.testInt = 2500; + TEST_THRIFT_STRUCT.testLong = 79303L; + TEST_THRIFT_STRUCT.testDouble = 25.007; + TEST_THRIFT_STRUCT.testBool = true; + TEST_THRIFT_STRUCT.stringIntMap = new HashMap<>(); + TEST_THRIFT_STRUCT.stringIntMap.put("first", (short) 1); + TEST_THRIFT_STRUCT.stringIntMap.put("second", (short) 2); + TEST_THRIFT_STRUCT.testBinary = buffer; + + testThriftStructs = ImmutableList.copyOf(generateTestObjects(1000L)); + } + + /** Tests {@link ThriftIO#readFiles(Class)} with {@link TBinaryProtocol}. */ + @Test + public void testReadFilesBinaryProtocol() { + + PCollection<TestThriftStruct> testThriftDoc = + mainPipeline + .apply(Create.of(THRIFT_DIR + "data").withCoder(StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply(ThriftIO.readFiles(TestThriftStruct.class).withProtocol(tBinaryProtoFactory)); + + // Assert + PAssert.that(testThriftDoc).containsInAnyOrder(TEST_THRIFT_STRUCT); + + // Execute pipeline + mainPipeline.run(); + } + + /** + * Tests {@link ThriftIO#sink(TProtocolFactory)} and {@link ThriftIO#readFiles(Class)} with {@link + * TBinaryProtocol}. + */ + @Test + public void testReadWriteBinaryProtocol() { + + mainPipeline + .apply(Create.of(testThriftStructs).withCoder(ThriftCoder.of())) + .apply( + FileIO.<TestThriftStruct>write() + .via(ThriftIO.sink(tBinaryProtoFactory)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + + // Execute write pipeline + mainPipeline.run().waitUntilFinish(); + + // Read written files + PCollection<TestThriftStruct> readDocs = + readPipeline + .apply( + Create.of(temporaryFolder.getRoot().getAbsolutePath() + "/*") + .withCoder(StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply(ThriftIO.readFiles(TestThriftStruct.class).withProtocol(tBinaryProtoFactory)); + + // Assert + PAssert.that(readDocs).containsInAnyOrder(testThriftStructs); + + // Execute read pipeline + readPipeline.run().waitUntilFinish(); + } + + /** + * Tests {@link ThriftIO#sink(TProtocolFactory)} and {@link ThriftIO#readFiles(Class)} with {@link + * TJSONProtocol}. + */ + @Test + public void testReadWriteJsonProtocol() { + + mainPipeline + .apply(Create.of(testThriftStructs).withCoder(ThriftCoder.of())) + .apply( + FileIO.<TestThriftStruct>write() + .via(ThriftIO.sink(tJsonProtocolFactory)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + + // Execute write pipeline + mainPipeline.run().waitUntilFinish(); + + // Read written files + PCollection<TestThriftStruct> readDocs = + readPipeline + .apply( + Create.of(temporaryFolder.getRoot().getAbsolutePath() + "/*") + .withCoder(StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply(ThriftIO.readFiles(TestThriftStruct.class).withProtocol(tJsonProtocolFactory)); + + // Assert + PAssert.that(readDocs).containsInAnyOrder(testThriftStructs); + + // Execute read pipeline + readPipeline.run().waitUntilFinish(); + } + + /** + * Tests {@link ThriftIO#sink(TProtocolFactory)} and {@link ThriftIO#readFiles(Class)} with {@link + * TCompactProtocol}. + */ + @Test + public void testReadWriteCompactProtocol() { + Review comment: Done, removed empty spaces from test cases. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 385489) Time Spent: 14h (was: 13h 50m) > Add ThriftIO to Support IO for Thrift Files > ------------------------------------------- > > Key: BEAM-8561 > URL: https://issues.apache.org/jira/browse/BEAM-8561 > Project: Beam > Issue Type: New Feature > Components: io-java-files > Reporter: Chris Larsen > Assignee: Chris Larsen > Priority: Major > Time Spent: 14h > Remaining Estimate: 0h > > Similar to AvroIO it would be very useful to support reading and writing > to/from Thrift files with a native connector. > Functionality would include: > # read() - Reading from one or more Thrift files. > # write() - Writing to one or more Thrift files. -- This message was sent by Atlassian Jira (v8.3.4#803005)