[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387712#comment-17387712
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 merged pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387353#comment-17387353
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676614549



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   +1 for implementing that in following PRs, let's make this key generator 
simple, it is suitable for most of the cases. The raw key generator class is 
too hard to use for users, maybe we can simplify them with simpler config 
options.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387351#comment-17387351
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676612350



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   The `HoodieTableSink` can ensure that the operation is definitely 
bulk_insert.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> ---

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387339#comment-17387339
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676594403



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   KeyGenerator is a very powerful tool of hudi. SQL users can still define 
the key generator through the config right. I use the TimestampBased key 
generator for all the time partitioned tables. I think we should at least 
implement the TimestampBased keygen and Custom keygen for Flink RowData later 
on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387335#comment-17387335
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676590742



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   This is the BulkInsertWriteFunction so the action should be bulk_insert 
at least? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> --

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387304#comment-17387304
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676560948



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   The SQL user does not need to care about what key generator they use, 
just should always use the primary key syntax instead, the `KeyGenerator` is a 
developer interface, IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387302#comment-17387302
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676559586



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   User can choose whatever they like, we do not need to put restrictions 
on the code, i think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387301#comment-17387301
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385780



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387291#comment-17387291
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676545936



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   BuiltinKeyGenerator makes a good interface for the user to write their 
own key generator and take the advantage of this optimization though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387290#comment-17387290
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676545264



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   What's your plan to get the `HoodieKey` with a different key generator 
directly from `RowData`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387287#comment-17387287
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676544337



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   Why do you need MOR though. MOR still needs to do the Avro serialization 
and will have a poor read performance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk ins

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387206#comment-17387206
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * c41c298b8ff88ddf5f34654d3641a461f895d5e2 UNKNOWN
   * ee775698bc6eac917a9c410dd92edc8ca850d719 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1157)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387181#comment-17387181
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1151)
 
   * c41c298b8ff88ddf5f34654d3641a461f895d5e2 UNKNOWN
   * ee775698bc6eac917a9c410dd92edc8ca850d719 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1157)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387169#comment-17387169
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1151)
 
   * c41c298b8ff88ddf5f34654d3641a461f895d5e2 UNKNOWN
   * ee775698bc6eac917a9c410dd92edc8ca850d719 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387167#comment-17387167
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676388084



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   I don't think `BuiltinKeyGenerator` is a good example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387158#comment-17387158
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676386465



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   MOR also works actually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> 

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387157#comment-17387157
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385780



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387156#comment-17387156
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385494



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387154#comment-17387154
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676383783



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with RowData for datasource implemention of bulk insert.
+ */
+public class HoodieRowDataCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowDataCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  protected final HoodieRowDataFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  protected final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
+   String instantTime, int taskPartitionId, 
long taskId, long taskEpochId,
+   RowType rowType) {
+this.partitionPath = partitionPath;
+this.table = table;
+this.writeConfig = writeConfig;
+this.instantTime = instantTime;
+this.taskPartitionId = taskPartitionId;
+this.taskId = taskId;
+this.taskEpochId = taskEpochId;
+this.fileId = fileId;
+this.currTimer = new HoodieTimer();
+this.currTimer.startTimer();
+this.fs = table.getMetaClient().getFs();
+this.path = makeNewPath(partitionPath);
+this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+writeConfig.getWriteStatusFailureFraction());
+writeStatus.setPartitionPath(partitionPath);
+writeStatus.setFileId(fileId);
+try {
+  HoodiePartitionMetadata partitionMetadata =
+  new HoodiePartitionMetadata(
+  fs,
+  instantTime,
+  new Path(writeConfig.getBasePath()),
+  FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+  partitionMetadata.trySave(taskPartitionId);
+  createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+  this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
+} catch (IOException e) {
+  throw new HoodieInsertException("Failed to initialize file writer for 
pat

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387153#comment-17387153
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676382204



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+   String commitSeqNumber,
+   String recordKey,
+   String partitionPath,
+   String fileName,
+   RowData row) {
+this.commitTime = commitTime;
+this.commitSeqNumber = commitSeqNumber;
+this.recordKey = recordKey;
+this.partitionPath = partitionPath;
+this.fileName = fileName;
+this.row = row;
+this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+return 5 + row.getArity();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return row.getRowKind();
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+this.row.setRowKind(kind);
+  }
+
+  private String getMetaColumnVal(int ordinal) {
+switch (ordinal) {
+  case 0: {
+return commitTime;
+  }
+  case 1: {
+return commitSeqNumber;
+  }
+  case 2: {
+return recordKey;
+  }
+  case 3: {
+return partitionPath;
+  }
+  case 4: {
+return fileName;
+  }
+  default:
+throw new IllegalArgumentException("Not expected");
+}
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) {
+if (ordinal < metaColumnsNum) {
+  return null == getMetaColumnVal(ordinal);
+}
+return row.isNullAt(ordinal - metaColumnsNum);
+  }
+
+  @Override
+  public boolean getBoolean(int ordinal) {
+return row.getBoolean(ordinal - metaColumnsNum);

Review comment:
   No, we can use a `JoinedRowData` instead but that code is actually not 
that efficient as this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387152#comment-17387152
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676381427



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+   String commitSeqNumber,
+   String recordKey,
+   String partitionPath,
+   String fileName,
+   RowData row) {
+this.commitTime = commitTime;
+this.commitSeqNumber = commitSeqNumber;
+this.recordKey = recordKey;
+this.partitionPath = partitionPath;
+this.fileName = fileName;
+this.row = row;
+this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+return 5 + row.getArity();

Review comment:
   Nice catch ~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387149#comment-17387149
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1151)
 
   * c41c298b8ff88ddf5f34654d3641a461f895d5e2 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387101#comment-17387101
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1151)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387071#comment-17387071
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * 6fecd2b043e75f3327d4a8f7348c7675682fcd70 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1139)
 
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1151)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387053#comment-17387053
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * 6fecd2b043e75f3327d4a8f7348c7675682fcd70 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1139)
 
   * f7adabe43137d7c9052acd2a361b6fc429f9af72 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387013#comment-17387013
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676092014



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}

Review comment:
   `Flink RowData` instead of `Internal Row`? 

##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+   String commitSeqNumber,
+   String recordKey,
+   String partitionPath,
+   String fileName,
+   RowData row) {
+this.commitTime = commitTime;
+this.commitSeqNumber = commitSeqNumber;
+this.recordKey = recordKey;
+this.partitionPath = partitionPath;
+this.fileName = fileName;
+this.row = row;
+this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+return 5 + row.getArity();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return row.getRowKind();
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+this.row.setRowKind(kind);
+  }
+
+  private String getMetaColumnVal(int ordinal) {
+switch (ordinal) {
+  case 0: {
+   

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386969#comment-17386969
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

yuzhaojing commented on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-886265827


   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386656#comment-17386656
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 closed pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386657#comment-17386657
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 opened a new pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
 - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test 
coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please 
describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
 - *Added integration tests for end-to-end.*
 - *Added HoodieClientWriteTest to verify the change.*
 - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386638#comment-17386638
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 opened a new pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
 - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test 
coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please 
describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
 - *Added integration tests for end-to-end.*
 - *Added HoodieClientWriteTest to verify the change.*
 - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386637#comment-17386637
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 closed pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386609#comment-17386609
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * 6fecd2b043e75f3327d4a8f7348c7675682fcd70 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1139)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386604#comment-17386604
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 Azure: 
[CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1137)
 
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * 6fecd2b043e75f3327d4a8f7348c7675682fcd70 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1139)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386602#comment-17386602
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 Azure: 
[CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1137)
 
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   * 6fecd2b043e75f3327d4a8f7348c7675682fcd70 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386599#comment-17386599
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 Azure: 
[CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1137)
 
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386598#comment-17386598
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * bec06d2304c67b544befff79bc6559520024f7b3 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1134)
 
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1137)
 
   * 6f7f38716d9a2c0ef10ebf2c349cdcf0e5f053de UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386587#comment-17386587
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * bec06d2304c67b544befff79bc6559520024f7b3 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1134)
 
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1137)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386584#comment-17386584
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * bec06d2304c67b544befff79bc6559520024f7b3 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1134)
 
   * 5bc8824bcb6983e12596d79a4b9df0b9c42a4502 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386580#comment-17386580
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r675907954



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;
+  private final RowDataKeyGen keyGen;
+
+  public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType,
+boolean arePartitionRecordsSorted) {
+this.hoodieTable = hoodieTable;
+this.writeConfig = writeConfig;
+this.instantTime = instantTime;
+this.taskPartitionId = taskPartitionId;
+this.taskId = taskId;
+this.taskEpochId = taskEpochId;
+this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+this.arePartitionRecordsSorted = arePartitionRecordsSorted;
+this.fileIdPrefix = UUID.randomUUID().toString();
+try {
+  this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(writeConfig.getProps()));
+} catch (IOException e) {
+  throw new HoodieException("Create key generator error", e);
+}
+this.schema = getWriteSchema(writeConfig);
+this.keyGen = RowDataKeyGen.instance(conf, rowType);
+  }
+
+  /**
+   * Returns the write instant time.
+   */
+  public String getInstantTime() {
+return this.instantTime;
+  }
+
+  /**
+   * Get the schema of the actual write.
+   */
+  private static Schema getWriteSchema(HoodieWriteConfig config) {
+return new Schema.Parser().parse(config.getWriteSchema());
+  }
+
+  public void write(RowData record) throws IOException {
+try {
+  String recordKey = keyGen.getRecordKey(record);
+  String partitionPath = keyGen.getPartitionPath(record);
+
+  if ((lastKnownPartitionPath == null) || 
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+LOG.i

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386366#comment-17386366
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

yuzhaojing commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r675673937



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;

Review comment:
   These do not seem to be in use.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logica

[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386264#comment-17386264
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * bec06d2304c67b544befff79bc6559520024f7b3 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1134)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386236#comment-17386236
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * d23d24f8cee6fd55de1e433f1c07b40a2e0a3391 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1127)
 
   * bec06d2304c67b544befff79bc6559520024f7b3 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1134)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386235#comment-17386235
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * d23d24f8cee6fd55de1e433f1c07b40a2e0a3391 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1127)
 
   * bec06d2304c67b544befff79bc6559520024f7b3 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386176#comment-17386176
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

yuzhaojing commented on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885575380


   I'm excited to see this feature!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386138#comment-17386138
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * d23d24f8cee6fd55de1e433f1c07b40a2e0a3391 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1127)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386100#comment-17386100
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot edited a comment on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * d23d24f8cee6fd55de1e433f1c07b40a2e0a3391 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1127)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386097#comment-17386097
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

hudi-bot commented on pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#issuecomment-885497812


   
   ## CI report:
   
   * d23d24f8cee6fd55de1e433f1c07b40a2e0a3391 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run travis` re-run the last Travis build
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2209) Bulk insert for flink writer

2021-07-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386094#comment-17386094
 ] 

ASF GitHub Bot commented on HUDI-2209:
--

danny0405 opened a new pull request #3334:
URL: https://github.com/apache/hudi/pull/3334


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
 - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test 
coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please 
describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
 - *Added integration tests for end-to-end.*
 - *Added HoodieClientWriteTest to verify the change.*
 - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert for flink writer
> 
>
> Key: HUDI-2209
> URL: https://issues.apache.org/jira/browse/HUDI-2209
> Project: Apache Hudi
>  Issue Type: New Feature
>  Components: Flink Integration
>Reporter: Danny Chen
>Assignee: Danny Chen
>Priority: Major
> Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)