SuXingLee closed pull request #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-filesystem/pom.xml 
b/flink-connectors/flink-connector-filesystem/pom.xml
index 592db4b5106..55425277cbc 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -65,6 +65,15 @@ under the License.
                        <optional>true</optional>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project, won't depend 
on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
@@ -105,6 +114,14 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-hdfs</artifactId>
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
new file mode 100644
index 00000000000..4c2899bd172
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.util.TableConnectorUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * An bucketing Table sink.
+ */
+public class BucketingTableSink implements AppendStreamTableSink<Row> {
+
+       private final BucketingSink<Row> sink;
+       private String[] fieldNames;
+       private TypeInformation<?>[] fieldTypes;
+
+       /**
+        * Creates a new {@code BucketingTableSink} that writes table rows to 
the given base directory.
+        *
+        * @param sink The BucketingSink to which to write the table rows.
+        */
+       public BucketingTableSink(BucketingSink<Row> sink) {
+               this.sink = sink;
+       }
+
+       /**
+        * A builder to configure and build the BucketingTableSink.
+        *
+        * @param basePath The directory to which to write the bucket files.
+        */
+       public static BucketingTableSinkBuilder builder(String basePath) {
+               return new BucketingTableSinkBuilder(basePath);
+       }
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               
dataStream.addSink(sink).name(TableConnectorUtil.generateRuntimeName(this.getClass(),
 fieldNames));
+       }
+
+       @Override
+       public TypeInformation<Row> getOutputType() {
+               return new RowTypeInfo(fieldTypes, fieldNames);
+       }
+
+       @Override
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return fieldTypes;
+       }
+
+       @Override
+       public TableSink<Row> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               BucketingTableSink copy;
+               try {
+                       copy = new 
BucketingTableSink(InstantiationUtil.clone(sink));
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException(e);
+               }
+               copy.fieldNames = fieldNames;
+               copy.fieldTypes = fieldTypes;
+               return copy;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
new file mode 100644
index 00000000000..87ee877f3d1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+/**
+ * A builder to configure and build the BucketingTableSink.
+ */
+public class BucketingTableSinkBuilder {
+
+       private final BucketingSink<Row> sink;
+
+       /**
+        * Specify the basePath of BucketingTableSink.
+        *
+        * @param basePath the basePath of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder(String basePath) {
+               this.sink = new BucketingSink<Row>(basePath);
+       }
+
+       /**
+        * Specify the batchSize of BucketingTableSink.
+        *
+        * @param batchSize the batchSize of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setBatchSize(long batchSize) {
+               this.sink.setBatchSize(batchSize);
+               return this;
+       }
+
+       /**
+        * Specify the batchRolloverInterval of BucketingTableSink.
+        *
+        * @param batchRolloverInterval the batchRolloverInterval of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setBatchRolloverInterval(long 
batchRolloverInterval) {
+               this.sink.setBatchRolloverInterval(batchRolloverInterval);
+               return this;
+       }
+
+       /**
+        * Specify the bucketCheckInterval of BucketingTableSink.
+        *
+        * @param interval the bucketCheckInterval of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setInactiveBucketCheckInterval(long 
interval) {
+               this.sink.setInactiveBucketCheckInterval(interval);
+               return this;
+       }
+
+       /**
+        * Specify the inactiveBucketThreshold of BucketingTableSink.
+        *
+        * @param threshold the inactiveBucketThreshold of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setInactiveBucketThreshold(long 
threshold) {
+               this.sink.setInactiveBucketThreshold(threshold);
+               return this;
+       }
+
+       /**
+        * Specify the bucketer of BucketingTableSink.
+        *
+        * @param bucketer the bucketer of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setBucketer(Bucketer<Row> bucketer) {
+               this.sink.setBucketer(bucketer);
+               return this;
+       }
+
+       /**
+        * Specify the writer of BucketingTableSink.
+        *
+        * @param writer the writer of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setWriter(Writer<Row> writer) {
+               this.sink.setWriter(writer);
+               return this;
+       }
+
+       /**
+        * Specify the inProgressSuffix of BucketingTableSink.
+        *
+        * @param inProgressSuffix the inProgressSuffix of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setInProgressSuffix(String 
inProgressSuffix) {
+               this.sink.setInProgressSuffix(inProgressSuffix);
+               return this;
+       }
+
+       /**
+        * Specify the inProgressPrefix of BucketingTableSink.
+        *
+        * @param inProgressPrefix the inProgressPrefix of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setInProgressPrefix(String 
inProgressPrefix) {
+               this.sink.setInProgressPrefix(inProgressPrefix);
+               return this;
+       }
+
+       /**
+        * Specify the pendingSuffix of BucketingTableSink.
+        *
+        * @param pendingSuffix the pendingSuffix of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setPendingSuffix(String pendingSuffix) 
{
+               this.sink.setPendingSuffix(pendingSuffix);
+               return this;
+       }
+
+       /**
+        * Specify the pendingPrefix of BucketingTableSink.
+        *
+        * @param pendingPrefix the pendingPrefix of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setPendingPrefix(String pendingPrefix) 
{
+               this.sink.setPendingPrefix(pendingPrefix);
+               return this;
+       }
+
+       /**
+        * Specify the validLengthSuffix of BucketingTableSink.
+        *
+        * @param validLengthSuffix the validLengthSuffix of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setValidLengthSuffix(String 
validLengthSuffix) {
+               this.sink.setValidLengthSuffix(validLengthSuffix);
+               return this;
+       }
+
+       /**
+        * Specify the validLengthPrefix of BucketingTableSink.
+        *
+        * @param validLengthPrefix the validLengthPrefix of the 
BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setValidLengthPrefix(String 
validLengthPrefix) {
+               this.sink.setValidLengthPrefix(validLengthPrefix);
+               return this;
+       }
+
+       /**
+        * Specify the partSuffix of BucketingTableSink.
+        *
+        * @param partSuffix the partSuffix of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setPartSuffix(String partSuffix) {
+               this.sink.setPartSuffix(partSuffix);
+               return this;
+       }
+
+       /**
+        * Specify the partPrefix of BucketingTableSink.
+        *
+        * @param partPrefix the partPrefix of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setPartPrefix(String partPrefix) {
+               this.sink.setPartPrefix(partPrefix);
+               return this;
+       }
+
+       /**
+        * Specify the useTruncate of BucketingTableSink.
+        *
+        * @param useTruncate the useTruncate of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setUseTruncate(boolean useTruncate) {
+               this.sink.setUseTruncate(useTruncate);
+               return this;
+       }
+
+       /**
+        * Specify the asyncTimeout of BucketingTableSink.
+        *
+        * @param timeout the asyncTimeout of the BucketingTableSink.
+        */
+       public BucketingTableSinkBuilder setAsyncTimeout(long timeout) {
+               this.sink.setAsyncTimeout(timeout);
+               return this;
+       }
+
+       /**
+        * Finalizes the configuration and checks validity.
+        *
+        * @return BucketingTableSink
+        */
+       public BucketingTableSink build() {
+               return new BucketingTableSink(sink);
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
new file mode 100644
index 00000000000..4753369af1f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+/**
+ * Test for BucketingTableSinkTest.
+ */
+public class BucketingTableSinkTest {
+       private static final String[] FIELD_NAMES = new String[] { "foo" };
+       private static final TypeInformation<?>[] FIELD_TYPES = new 
TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO };
+       private static final RowTypeInfo ROW_TYPE = new 
RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+       @Test
+       public void testBucketingTableSink() {
+               BucketingTableSink sink = 
BucketingTableSink.builder("/data/tmp/foo")
+                               .setBatchSize(384 * 1024 * 1024)
+                               .setBatchRolloverInterval(5 * 60 * 1000)
+                               .setBucketer(new 
DateTimeBucketer<Row>("yyyy/MM/dd"))
+                               .build();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Row> ds = 
env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
+               sink.emitDataStream(ds);
+
+               Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
+               assertEquals(1, sinkIds.size());
+
+               int sinkId = sinkIds.iterator().next();
+               StreamSink planSink = (StreamSink) 
env.getStreamGraph().getStreamNode(sinkId).getOperator();
+               assertTrue(planSink.getUserFunction() instanceof 
BucketingSink<?>);
+       }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to