[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-03-01 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581689075



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##
@@ -208,12 +210,32 @@ public void bootstrap(Option> 
extraMetadata) {
 
   @Override
   public void commitCompaction(String compactionInstantTime, List 
writeStatuses, Option> extraMetadata) throws IOException {
-throw new HoodieNotSupportedException("Compaction is not supported yet");
+HoodieFlinkTable table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
+HoodieCommitMetadata metadata = 
FlinkCompactHelpers.newInstance().createCompactionMetadata(
+table, compactionInstantTime, writeStatuses, config.getSchema());
+extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
   }
 

Review comment:
   The `StreamWriteOperatorCoordinator.checkpointComplete` is the entry 
point to schedule a compaction, but because there is a pluggable strategy 
there, a schedule may generate a null compaction plan (E.G. no compaction), we 
say the compaction is async.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-28 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r584416641



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactEvent.java
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.compact;
+
+import org.apache.hudi.common.model.CompactionOperation;
+
+import java.io.Serializable;
+
+/**
+ * Represents a compact command from the compaction plan task {@link 
CompactionPlanOperator}.
+ */
+public class CompactEvent implements Serializable {

Review comment:
   > Thanks @danny0405 for the awesome work. Hard to catch up on the review 
since you are making progress too fast :)
   > Can't go into detail about this large PR too much until I get a chance to 
run this myself. Left some high-level comments.
   > One concern is about the test cases. I feel like Flink writer is not as 
well tested as Spark, so the reliability is a bit concerning for me when we 
officially release this feature. Any plan to add more test cases?
   
   Yes, we can add more test cases when more feature are introduced for Flink, 
such as `SQL connectors`, `INSERT OVERRIDE`, more kinds of key generators.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-28 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r584411150



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactCommitEvent.java
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.compact;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the compaction task {@link CompactFunction}.
+ */
+public class CompactCommitEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The compaction commit instant time.
+   */
+  private final String instant;
+  /**
+   * The write statuses.
+   */
+  private final List writeStatuses;
+  /**
+   * The compaction task identifier.
+   */
+  private final int taskID;
+
+  public CompactCommitEvent(String instant, List writeStatuses, 
int taskID) {
+this.instant = instant;
+this.writeStatuses = writeStatuses;
+this.taskID = taskID;
+  }
+
+  public String getInstant() {
+return instant;
+  }
+
+  public List getWriteStatuses() {
+return writeStatuses;
+  }
+
+  public int getTaskID() {

Review comment:
   Not used in current code, but i would rather keep it in case of future 
usage.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-28 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r584410935



##
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##
@@ -165,6 +165,42 @@ private FlinkOptions() {
   .defaultValue(128D) // 128MB
   .withDescription("Batch buffer size in MB to flush data into the 
underneath filesystem");
 
+  // 
+  //  Compaction Options
+  // 
+
+  public static final ConfigOption WRITE_ASYNC_COMPACTION = 
ConfigOptions
+  .key("compaction.async.enabled")
+  .booleanType()
+  .defaultValue(true) // default true for MOR write
+  .withDescription("Async Compaction, enabled by default for MOR");
+
+  public static final String NUM_COMMITS = "num_commits";
+  public static final String TIME_ELAPSED = "time_elapsed";
+  public static final String NUM_AND_TIME = "num_and_time";
+  public static final String NUM_OR_TIME = "num_or_time";
+  public static final ConfigOption COMPACTION_TRIGGER_STRATEGY = 
ConfigOptions
+  .key("compaction.trigger.strategy")

Review comment:
   No, the option key of `HoodieCompactionConfig` is too long and not very 
friendly to use as SQL options.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-28 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r584410723



##
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##
@@ -165,6 +165,42 @@ private FlinkOptions() {
   .defaultValue(128D) // 128MB
   .withDescription("Batch buffer size in MB to flush data into the 
underneath filesystem");
 
+  // 
+  //  Compaction Options
+  // 
+
+  public static final ConfigOption WRITE_ASYNC_COMPACTION = 
ConfigOptions
+  .key("compaction.async.enabled")
+  .booleanType()
+  .defaultValue(true) // default true for MOR write
+  .withDescription("Async Compaction, enabled by default for MOR");
+
+  public static final String NUM_COMMITS = "num_commits";

Review comment:
   No, the enumeration comes from what HUDI core defines, see 
`CompactionTriggerStrategy`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-28 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r584409961



##
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##
@@ -165,6 +165,42 @@ private FlinkOptions() {
   .defaultValue(128D) // 128MB
   .withDescription("Batch buffer size in MB to flush data into the 
underneath filesystem");
 
+  // 
+  //  Compaction Options
+  // 
+
+  public static final ConfigOption WRITE_ASYNC_COMPACTION = 
ConfigOptions

Review comment:
   Rename to `COMPACTION_ASYNC_ENABLED`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-26 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r583581482



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/CopyOnWriteTest.java
##
@@ -58,13 +57,13 @@
 /**
  * Test cases for StreamingSinkFunction.
  */
-public class StreamWriteFunctionTest {
+public class CopyOnWriteTest {

Review comment:
   Rename to `TestWriteCopyOnWrite`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-26 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r583580379



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.partitioner.delta.DeltaBucketAssigner;
+
+/**
+ * Utilities for {@code BucketAssigner}.
+ */
+public abstract class BucketAssigners {
+
+  private BucketAssigners() {}
+
+  /**
+   * Creates a {@code BucketAssigner}.
+   *
+   * @param tableType The table type
+   * @param context   The engine context
+   * @param configThe configuration
+   * @return the bucket assigner instance
+   */
+  public static BucketAssigner create(
+  HoodieTableType tableType,
+  HoodieFlinkEngineContext context,
+  HoodieWriteConfig config) {
+switch (tableType) {
+  case COPY_ON_WRITE:
+return new BucketAssigner(context, config);
+  case MERGE_ON_READ:
+return new DeltaBucketAssigner(context, config);
+  default:
+throw new AssertionError();

Review comment:
   This is an internal exception which should never happen, i prefer a 
simple `AssertionError`, which means this is a bug within HUDI, not an 
unsupported feature.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-26 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r583574546



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -178,6 +178,14 @@ private void init(HoodieRecord record) {
 }
   }
 
+  /**
+   * Returns whether the hoodie record is an UPDATE.
+   */
+  protected boolean isUpdateRecord(HoodieRecord hoodieRecord) {
+// If currentLocation is present, then this is an update
+return hoodieRecord.getCurrentLocation() != null;

Review comment:
   This is the default logic for Hoodie, we should keep it, the 
`FlinkAppendHandle` already overrides the behavior.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-24 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581824995



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitSink.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Function to check and commit the compaction action.
+ *
+ *  Each time after receiving a compaction commit event {@link 
CompactCommitEvent},
+ * it loads and checks the compaction plan {@link HoodieCompactionPlan},
+ * if all the compaction operations {@link 
org.apache.hudi.common.model.CompactionOperation}
+ * of the plan are finished, tries to commit the compaction action.
+ */
+public class CompactionCommitSink extends RichSinkFunction 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCommitSink.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * Buffer to collect the event from each compact task {@code 
CompactFunction}.
+   */
+  private transient List commitBuffer;
+
+  /**
+   * Current on-going compaction instant time.
+   */
+  private String compactionInstantTime;
+
+  public CompactionCommitSink(Configuration conf, int numTasks) {

Review comment:
   Yes, we can remove it in the current code base.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-24 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581824673



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/MergeOnReadCompactTest.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test cases for delta stream write with compaction.
+ */
+public class MergeOnReadCompactTest extends CopyOnWriteTest {

Review comment:
   It is not very convenient because we have some custom strategies for 
each type, that means the code are not 100% reused, IMHO, `ParameterizedTest` 
is more suitable for values enumeration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-23 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581689075



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##
@@ -208,12 +210,32 @@ public void bootstrap(Option> 
extraMetadata) {
 
   @Override
   public void commitCompaction(String compactionInstantTime, List 
writeStatuses, Option> extraMetadata) throws IOException {
-throw new HoodieNotSupportedException("Compaction is not supported yet");
+HoodieFlinkTable table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
+HoodieCommitMetadata metadata = 
FlinkCompactHelpers.newInstance().createCompactionMetadata(
+table, compactionInstantTime, writeStatuses, config.getSchema());
+extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
   }
 

Review comment:
   The `StreamWriteOperatorCoordinator.checkpointComplete` is the entry 
point to schedule a checkpoint, but because there is a pluggable strategy 
there, a schedule may generate a null compaction plan (E.G. no compaction), we 
say the compaction is async.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-23 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581683773



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
##
@@ -219,4 +226,81 @@ public void testWriteToHoodieLegacy() throws Exception {
 
 TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
+
+  @Test
+  public void testMergeOnReadWriteWithCompaction() throws Exception {
+Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
+StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.getConfig().disableObjectReuse();
+execEnv.setParallelism(4);
+// set up checkpoint interval
+execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+// Read from file source
+RowType rowType =
+(RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+.getLogicalType();
+StreamWriteOperatorFactory operatorFactory =
+new StreamWriteOperatorFactory<>(conf);
+
+JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+rowType,
+new RowDataTypeInfo(rowType),
+false,
+true,
+TimestampFormat.ISO_8601
+);
+String sourcePath = Objects.requireNonNull(Thread.currentThread()
+.getContextClassLoader().getResource("test_source.data")).toString();
+
+TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+format.setFilesFilter(FilePathFilter.createDefaultFilter());
+TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+format.setCharsetName("UTF-8");
+
+DataStreamSink dataStream = execEnv
+// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 
1000, typeInfo)
+.map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+.setParallelism(4)
+.map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
+// Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
+.keyBy(HoodieRecord::getPartitionPath)
+.transform(
+"bucket_assigner",
+TypeInformation.of(HoodieRecord.class),
+new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+.uid("uid_bucket_assigner")
+// shuffle by fileId(bucket id)
+.keyBy(record -> record.getCurrentLocation().getFileId())
+.transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
+.uid("uid_hoodie_stream_write")
+.transform("compact_plan_generate",
+TypeInformation.of(CompactEvent.class),
+new CompactionPlanOperator(conf))
+.uid("uid_compact_plan_generate")
+.setParallelism(1) // plan generate must be singleton

Review comment:
   Nice idea ~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-23 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581683689



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/MergeOnReadCompactTest.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test cases for delta stream write with compaction.
+ */
+public class MergeOnReadCompactTest extends CopyOnWriteTest {

Review comment:
   Sorry, i think there is no need to have another base class here, because 
the base class should have some the full test cases for `COPY_ON_WRITE` table 
type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

2021-02-23 Thread GitBox


danny0405 commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581616356



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##
@@ -208,12 +210,32 @@ public void bootstrap(Option> 
extraMetadata) {
 
   @Override
   public void commitCompaction(String compactionInstantTime, List 
writeStatuses, Option> extraMetadata) throws IOException {
-throw new HoodieNotSupportedException("Compaction is not supported yet");
+HoodieFlinkTable table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
+HoodieCommitMetadata metadata = 
FlinkCompactHelpers.newInstance().createCompactionMetadata(
+table, compactionInstantTime, writeStatuses, config.getSchema());
+extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
   }
 

Review comment:
   You can take `StreamWriteITCase#testMergeOnReadWriteWithCompaction` for 
an example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org