This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b0991a47a0e1a7bba416be1c0b3781c298966731
Author: mingmxu <ming...@ebay.com>
AuthorDate: Sun Feb 25 22:14:25 2018 -0800

    add setup/teardown for BeamSqlSeekableTable.
---
 .../apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 10 ++++++++++
 .../extensions/sql/impl/transform/BeamJoinTransforms.java    | 12 +++++++++++-
 .../sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java      |  8 ++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
index d274dd9..95165a5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
@@ -29,7 +29,17 @@ import org.apache.beam.sdk.values.Row;
 @Experimental
 public interface BeamSqlSeekableTable extends Serializable{
   /**
+   * prepare the instance.
+   */
+  void setup();
+
+  /**
    * return a list of {@code Row} with given key set.
    */
   List<Row> seekRow(Row lookupSubRow);
+
+  /**
+   * cleanup resources of the instance.
+   */
+  void teardown();
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 648b973..2b44814 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -219,7 +219,12 @@ public class BeamJoinTransforms {
 
     @Override
     public PCollection<Row> expand(PCollection<Row> input) {
-      return input.apply("join_as_lookup", ParDo.of(new DoFn<Row, Row>() {
+      return input.apply("join_as_lookup", ParDo.of(new DoFn<Row, Row>(){
+        @Setup
+        public void setup(){
+          seekableTable.setup();
+        }
+
         @ProcessElement
         public void processElement(ProcessContext context) {
           Row factRow = context.element();
@@ -230,6 +235,11 @@ public class BeamJoinTransforms {
           }
         }
 
+        @Teardown
+        public void teardown(){
+          seekableTable.teardown();
+        }
+
         private Row extractJoinSubRow(Row factRow) {
           List<Object> joinSubsetValues =
               factJoinIdx
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index ca639bd..6a06123 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -131,6 +131,14 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
     public List<Row> seekRow(Row lookupSubRow) {
       return Arrays.asList(Row.withRowType(getRowType()).addValues(1, 
"SITE1").build());
     }
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void teardown() {
+    }
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
xumingm...@apache.org.

Reply via email to