This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b64d22e [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434) b64d22e is described below commit b64d22e0478b967c83be22509beaaf8c5f114e19 Author: luokey <854194...@qq.com> AuthorDate: Fri Jan 22 09:17:50 2021 +0800 [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434) --- .../hudi/operator/InstantGenerateOperator.java | 170 +++++++++++++++------ 1 file changed, 123 insertions(+), 47 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 4e32ec7..7879243 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new @@ -71,16 +74,20 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord private String latestInstant = ""; private List<String> latestInstantList = new ArrayList<>(1); private transient ListState<String> latestInstantState; - private List<StreamRecord> bufferedRecords = new LinkedList(); - private transient ListState<StreamRecord> recordsState; private Integer retryTimes; private Integer retryInterval; + private static final String DELIMITER = "_"; + private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker"; + private transient boolean isMain = false; + private transient AtomicLong recordCounter = new AtomicLong(0); + private StreamingRuntimeContext runtimeContext; + private int indexOfThisSubtask; @Override public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception { if (streamRecord.getValue() != null) { - bufferedRecords.add(streamRecord); output.collect(streamRecord); + recordCounter.incrementAndGet(); } } @@ -88,7 +95,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord public void open() throws Exception { super.open(); // get configs from runtimeContext - cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters(); // retry times retryTimes = Integer.valueOf(cfg.blockRetryTime); @@ -102,65 +109,78 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord // Hadoop FileSystem fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get()); - TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null); + if (isMain) { + TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext); - // writeClient - writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true); + // writeClient + writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true); - // init table, create it if not exists. - initTable(); + // init table, create it if not exists. + initTable(); + + // create instant marker directory + createInstantMarkerDir(); + } } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { super.prepareSnapshotPreBarrier(checkpointId); - // check whether the last instant is completed, if not, wait 10s and then throws an exception - if (!StringUtils.isNullOrEmpty(latestInstant)) { - doCheck(); - // last instant completed, set it empty - latestInstant = ""; - } - - // no data no new instant - if (!bufferedRecords.isEmpty()) { - latestInstant = startNewInstant(checkpointId); + String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get()); + Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName); + // mk marker file by each subtask + fs.create(path, true); + LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName); + if (isMain) { + // check whether the last instant is completed, if not, wait 10s and then throws an exception + if (!StringUtils.isNullOrEmpty(latestInstant)) { + doCheck(); + // last instant completed, set it empty + latestInstant = ""; + } + boolean receivedDataInCurrentCP = checkReceivedData(checkpointId); + // no data no new instant + if (receivedDataInCurrentCP) { + latestInstant = startNewInstant(checkpointId); + } } } @Override public void initializeState(StateInitializationContext context) throws Exception { - // instantState - ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class); - latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); - - // recordState - ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class); - recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor); - - if (context.isRestored()) { - Iterator<String> latestInstantIterator = latestInstantState.get().iterator(); - latestInstantIterator.forEachRemaining(x -> latestInstant = x); - LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant); - - Iterator<StreamRecord> recordIterator = recordsState.get().iterator(); - bufferedRecords.clear(); - recordIterator.forEachRemaining(x -> bufferedRecords.add(x)); + runtimeContext = getRuntimeContext(); + indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); + isMain = indexOfThisSubtask == 0; + + if (isMain) { + // instantState + ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class); + latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); + + if (context.isRestored()) { + Iterator<String> latestInstantIterator = latestInstantState.get().iterator(); + latestInstantIterator.forEachRemaining(x -> latestInstant = x); + LOG.info("Restoring the latest instant [{}] from the state", latestInstant); + } } } @Override public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception { - if (latestInstantList.isEmpty()) { - latestInstantList.add(latestInstant); + long checkpointId = functionSnapshotContext.getCheckpointId(); + long recordSize = recordCounter.get(); + if (isMain) { + LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId); + if (latestInstantList.isEmpty()) { + latestInstantList.add(latestInstant); + } else { + latestInstantList.set(0, latestInstant); + } + latestInstantState.update(latestInstantList); } else { - latestInstantList.set(0, latestInstant); + LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId); } - latestInstantState.update(latestInstantList); - LOG.info("Update latest instant [{}]", latestInstant); - - recordsState.update(bufferedRecords); - LOG.info("Update records state size = [{}]", bufferedRecords.size()); - bufferedRecords.clear(); + recordCounter.set(0); } /** @@ -185,10 +205,10 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord int tryTimes = 0; while (tryTimes < retryTimes) { tryTimes++; - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); if (rollbackPendingCommits.contains(latestInstant)) { rollbackPendingCommits.forEach(x -> sb.append(x).append(",")); - LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes); + LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes); TimeUnit.SECONDS.sleep(retryInterval); rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType); } else { @@ -222,4 +242,60 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord fs.close(); } } + + private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { + int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + FileStatus[] fileStatuses; + Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME); + // waiting all subtask create marker file ready + while (true) { + Thread.sleep(500L); + fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() { + @Override + public boolean accept(Path pathname) { + return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER)); + } + }); + + // is ready + if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) { + break; + } + } + + boolean receivedData = false; + // judge whether has data in this checkpoint and delete maker file. + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + String name = path.getName(); + // has data + if (Long.parseLong(name.split(DELIMITER)[2]) > 0) { + receivedData = true; + break; + } + } + + // delete all marker file + cleanMarkerDir(instantMarkerPath); + + return receivedData; + } + + private void createInstantMarkerDir() throws IOException { + // Always create instantMarkerFolder which is needed for InstantGenerateOperator + final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME); + if (!fs.exists(instantMarkerFolder)) { + fs.mkdirs(instantMarkerFolder); + } else { + // Clean marker dir. + cleanMarkerDir(instantMarkerFolder); + } + } + + private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder); + for (FileStatus fileStatus : fileStatuses) { + fs.delete(fileStatus.getPath(), true); + } + } }