This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9804662 [HUDI-1738] Emit deletes for flink MOR table streaming read (#2742) 9804662 is described below commit 9804662bc8e17d6936c20326f17ec7c0360dcaf6 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Thu Apr 1 15:25:31 2021 +0800 [HUDI-1738] Emit deletes for flink MOR table streaming read (#2742) Current we did a soft delete for DELETE row data when writes into hoodie table. For streaming read of MOR table, the Flink reader detects the delete records and still emit them if the record key semantics are still kept. This is useful and actually a must for streaming ETL pipeline incremental computation. --- .../java/org/apache/hudi/keygen/KeyGenUtils.java | 27 +++++ .../apache/hudi/client/HoodieFlinkWriteClient.java | 4 +- .../table/timeline/HoodieActiveTimeline.java | 23 +++- .../org/apache/hudi/sink/StreamWriteFunction.java | 14 +-- .../hudi/sink/StreamWriteOperatorCoordinator.java | 73 +++---------- .../apache/hudi/sink/compact/CompactFunction.java | 14 +-- .../hudi/sink/compact/CompactionCommitSink.java | 14 +-- .../hudi/sink/compact/CompactionPlanOperator.java | 45 +++++--- .../hudi/sink/event/BatchWriteSuccessEvent.java | 5 +- .../org/apache/hudi/table/HoodieTableSource.java | 27 +++-- .../table/format/mor/MergeOnReadInputFormat.java | 119 +++++++++++++++++++-- .../table/format/mor/MergeOnReadTableState.java | 33 +++++- .../apache/hudi/util/StringToRowDataConverter.java | 107 ++++++++++++++++++ .../sink/TestStreamWriteOperatorCoordinator.java | 6 +- .../apache/hudi/source/TestStreamReadOperator.java | 18 ++-- .../apache/hudi/table/HoodieDataSourceITCase.java | 35 ++++++ .../apache/hudi/table/format/TestInputFormat.java | 25 +++++ .../test/java/org/apache/hudi/utils/TestData.java | 8 +- .../hudi/utils/TestStringToRowDataConverter.java | 107 ++++++++++++++++++ .../utils/factory/CollectSinkTableFactory.java | 11 +- 20 files changed, 557 insertions(+), 158 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 1f59bab..4773ef1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; public class KeyGenUtils { @@ -41,6 +42,32 @@ public class KeyGenUtils { protected static final String DEFAULT_PARTITION_PATH = "default"; protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + /** + * Extracts the record key fields in strings out of the given record key, + * this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}. + * + * @see SimpleAvroKeyGenerator + * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator + */ + public static String[] extractRecordKeys(String recordKey) { + String[] fieldKV = recordKey.split(","); + if (fieldKV.length == 1) { + return fieldKV; + } else { + // a complex key + return Arrays.stream(fieldKV).map(kv -> { + final String[] kvArray = kv.split(":"); + if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { + return null; + } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { + return ""; + } else { + return kvArray[1]; + } + }).toArray(String[]::new); + } + } + public static String getRecordKey(GenericRecord record, List<String> recordKeyFields) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index de4d8ca..6a6bced 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -429,8 +429,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> table = getHoodieTable(); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); - activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant); - activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant); + activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, commitType, instant); + activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, commitType, instant); } public void transitionRequestedToInflight(String tableType, String inFlightInstant) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 25ea95e..9ff50fe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -173,10 +173,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } - public void deletePending(HoodieInstant.State state, String action, String instantStr) { + public void deletePendingIfExists(HoodieInstant.State state, String action, String instantStr) { HoodieInstant instant = new HoodieInstant(state, action, instantStr); ValidationUtils.checkArgument(!instant.isCompleted()); - deleteInstantFile(instant); + deleteInstantFileIfExists(instant); } public void deleteCompactionRequested(HoodieInstant instant) { @@ -185,6 +185,25 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } + private void deleteInstantFileIfExists(HoodieInstant instant) { + LOG.info("Deleting instant " + instant); + Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); + try { + if (metaClient.getFs().exists(inFlightCommitFilePath)) { + boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); + if (result) { + LOG.info("Removed instant " + instant); + } else { + throw new HoodieIOException("Could not delete instant " + instant); + } + } else { + LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist"); + } + } catch (IOException e) { + throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e); + } + } + private void deleteInstantFile(HoodieInstant instant) { LOG.info("Deleting instant " + instant); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 53104fb..364d28e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -18,11 +18,8 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.ObjectSizeCalculator; @@ -160,8 +157,8 @@ public class StreamWriteFunction<K, I, O> @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); initBuffer(); - initWriteClient(); initWriteFunction(); } @@ -254,15 +251,6 @@ public class StreamWriteFunction<K, I, O> this.addToBufferCondition = this.bufferLock.newCondition(); } - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config)); - } - private void initWriteFunction() { final String writeOperation = this.config.get(FlinkOptions.OPERATION); switch (WriteOperationType.fromValue(writeOperation)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index ad06617..51149e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -18,11 +18,11 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; @@ -54,7 +54,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -138,7 +137,7 @@ public class StreamWriteOperatorCoordinator // initialize event buffer reset(); // writeClient - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, null); // init table, create it if not exists. initTableIfNotExists(this.conf); // start a new instant @@ -178,7 +177,10 @@ public class StreamWriteOperatorCoordinator @Override public void notifyCheckpointComplete(long checkpointId) { // start to commit the instant. - checkAndCommitWithRetry(); + final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n" + + "but the coordinator has not received full write success events,\n" + + "rolls back the instant and rethrow", this.instant, checkpointId); + checkAndForceCommit(errorMsg); // if async compaction is on, schedule the compaction if (needsScheduleCompaction) { writeClient.scheduleCompaction(Option.empty()); @@ -226,10 +228,14 @@ public class StreamWriteOperatorCoordinator @Override public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { // no event to handle - Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, + ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, "The coordinator can only handle BatchWriteSuccessEvent"); BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - Preconditions.checkState(event.getInstantTime().equals(this.instant), + // the write task does not block after checkpointing(and before it receives a checkpoint success event), + // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // success event, the data buffer would flush with an older instant time. + ValidationUtils.checkState( + HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), String.format("Receive an unexpected event for instant %s from task %d", event.getInstantTime(), event.getTaskID())); if (this.eventBuffer[event.getTaskID()] != null) { @@ -258,14 +264,6 @@ public class StreamWriteOperatorCoordinator // Utilities // ------------------------------------------------------------------------- - @SuppressWarnings("rawtypes") - private void initWriteClient() { - writeClient = new HoodieFlinkWriteClient( - new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), - StreamerUtil.getHoodieClientConfig(this.conf), - true); - } - private void initHiveSync() { this.executor = new NonThrownExecutor(); this.hiveSyncContext = HiveSyncContext.create(conf); @@ -338,51 +336,6 @@ public class StreamWriteOperatorCoordinator doCommit(); } - @SuppressWarnings("unchecked") - private void checkAndCommitWithRetry() { - int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES); - if (retryTimes < 0) { - retryTimes = 1; - } - long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS); - int tryTimes = 0; - while (tryTimes++ < retryTimes) { - try { - if (!checkReady()) { - // Do not throw if the try times expires but the event buffer are still not ready, - // because we have a force check when next checkpoint starts. - if (tryTimes == retryTimes) { - // Throw if the try times expires but the event buffer are still not ready - throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed"); - } - sleepFor(retryIntervalMillis); - continue; - } - doCommit(); - return; - } catch (Throwable throwable) { - String cause = throwable.getCause() == null ? "" : throwable.getCause().toString(); - LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause); - if (tryTimes == retryTimes) { - throw new HoodieException("Not all write tasks finish the batch write to commit", throwable); - } - sleepFor(retryIntervalMillis); - } - } - } - - /** - * Sleep {@code intervalMillis} milliseconds in current thread. - */ - private void sleepFor(long intervalMillis) { - try { - TimeUnit.MILLISECONDS.sleep(intervalMillis); - } catch (InterruptedException e) { - LOG.error("Thread interrupted while waiting to retry the instant commits"); - throw new HoodieException(e); - } - } - /** Checks the buffer is ready to commit. */ private boolean checkReady() { return Arrays.stream(eventBuffer) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index d0fa51b..7f4f7b9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -18,11 +18,8 @@ package org.apache.hudi.sink.compact; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; @@ -62,7 +59,7 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv @Override public void open(Configuration parameters) throws Exception { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); } @Override @@ -82,13 +79,4 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv instantTime); collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); } - - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 86529a1..86dae20 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -19,11 +19,8 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -86,7 +83,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.commitBuffer = new ArrayList<>(); } @@ -142,13 +139,4 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { this.commitBuffer.clear(); this.compactionInstantTime = null; } - - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 6eebe94..e48f4ed 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -19,15 +19,14 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; @@ -37,6 +36,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -74,7 +74,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla @Override public void open() throws Exception { super.open(); - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); } @Override @@ -113,8 +113,16 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); if (!pendingCompactionTimeline.containsInstant(instant)) { - throw new IllegalStateException( - "No Compaction request available at " + compactionInstantTime + " to run compaction"); + // this means that the compaction plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + + // clean the compaction plan in auxiliary path and cancels the compaction. + + LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction"); + cleanInstant(table.getMetaClient(), instant); + return; } // Mark instant as compaction inflight @@ -130,17 +138,24 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla } } + private void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + try { + if (metaClient.getFs().exists(commitFilePath)) { + boolean deleted = metaClient.getFs().delete(commitFilePath, false); + if (deleted) { + LOG.info("Removed instant " + instant); + } else { + throw new HoodieIOException("Could not delete instant " + instant); + } + } + } catch (IOException e) { + throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); + } + } + @VisibleForTesting public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) { this.output = output; } - - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java index dd40de2..186a470 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java @@ -35,7 +35,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private List<WriteStatus> writeStatuses; private final int taskID; - private final String instantTime; + private String instantTime; private boolean isLastBatch; /** * Flag saying whether the event comes from the end of input, e.g. the source @@ -102,8 +102,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent { * @param other The event to be merged */ public void mergeWith(BatchWriteSuccessEvent other) { - ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime)); ValidationUtils.checkArgument(this.taskID == other.taskID); + // the instant time could be monotonically increasing + this.instantTime = other.instantTime; this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. List<WriteStatus> statusList = new ArrayList<>(); statusList.addAll(this.writeStatuses); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index f92e1a8..ada9e01 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -191,9 +191,9 @@ public class HoodieTableSource implements } else { InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name("streaming_source") + return source.name("bounded_source") .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_streaming_source"); + .uid("uid_bounded_source"); } } }; @@ -363,21 +363,26 @@ public class HoodieTableSource implements requiredRowType, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - inputSplits); - return new MergeOnReadInputFormat( - this.conf, - FilePathUtils.toFlinkPaths(paths), - hoodieTableState, - rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. - "default", - this.limit); + inputSplits, + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .paths(FilePathUtils.toFlinkPaths(paths)) + .tableState(hoodieTableState) + // use the explicit fields data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .emitDelete(isStreaming) + .build(); case COPY_ON_WRITE: FileInputFormat<RowData> format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), this.schema.getFieldNames(), this.schema.getFieldDataTypes(), this.requiredPos, - "default", + this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index a3a6f85..12bebdf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader; @@ -30,22 +31,28 @@ import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -116,13 +123,20 @@ public class MergeOnReadInputFormat */ private long currentReadCount = 0; - public MergeOnReadInputFormat( + /** + * Flag saying whether to emit the deletes. In streaming read mode, downstream + * operators need the delete messages to retract the legacy accumulator. + */ + private boolean emitDelete; + + private MergeOnReadInputFormat( Configuration conf, Path[] paths, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, - long limit) { + long limit, + boolean emitDelete) { this.conf = conf; this.paths = paths; this.tableState = tableState; @@ -133,6 +147,14 @@ public class MergeOnReadInputFormat // because we need to this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; + this.emitDelete = emitDelete; + } + + /** + * Returns the builder for {@link MergeOnReadInputFormat}. + */ + public static Builder builder() { + return new Builder(); } @Override @@ -177,7 +199,7 @@ public class MergeOnReadInputFormat if (filePath == null) { throw new IllegalArgumentException("File path was not specified in input format or configuration."); } else { - this.paths = new Path[] { new Path(filePath) }; + this.paths = new Path[] {new Path(filePath)}; } } // may supports nested files in the future. @@ -269,6 +291,13 @@ public class MergeOnReadInputFormat final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords(); final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator(); + final int[] pkOffset = tableState.getPkOffsetsInRequired(); + // flag saying whether the pk semantics has been dropped by user specified + // projections. For e.g, if the pk fields are [a, b] but user only select a, + // then the pk semantics is lost. + final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1); + final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset); + final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes); return new Iterator<RowData>() { private RowData currentRecord; @@ -278,14 +307,30 @@ public class MergeOnReadInputFormat if (logRecordsKeyIterator.hasNext()) { String curAvrokey = logRecordsKeyIterator.next(); Option<IndexedRecord> curAvroRecord = null; + final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey); try { - curAvroRecord = logRecords.get(curAvrokey).getData().getInsertValue(tableSchema); + curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); } catch (IOException e) { throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e); } if (!curAvroRecord.isPresent()) { - // delete record found, skipping - return hasNext(); + if (emitDelete && !pkSemanticLost) { + GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount()); + + final String recordKey = hoodieRecord.getRecordKey(); + final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey); + final Object[] converted = converter.convert(pkFields); + for (int i = 0; i < pkOffset.length; i++) { + delete.setField(pkOffset[i], converted[i]); + } + delete.setRowKind(RowKind.DELETE); + + this.currentRecord = delete; + return true; + } else { + // delete record found, skipping + return hasNext(); + } } else { // should improve the code when log scanner supports // seeking by log blocks with commit time which is more @@ -318,6 +363,10 @@ public class MergeOnReadInputFormat }; } + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + private interface RecordIterator { boolean reachedEnd() throws IOException; @@ -521,4 +570,62 @@ public class MergeOnReadInputFormat return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); } } + + /** + * Builder for {@link MergeOnReadInputFormat}. + */ + public static class Builder { + private Configuration conf; + private Path[] paths; + private MergeOnReadTableState tableState; + private List<DataType> fieldTypes; + private String defaultPartName; + private long limit = -1; + private boolean emitDelete = false; + + public Builder config(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder paths(Path[] paths) { + this.paths = paths; + return this; + } + + public Builder tableState(MergeOnReadTableState tableState) { + this.tableState = tableState; + return this; + } + + public Builder fieldTypes(List<DataType> fieldTypes) { + this.fieldTypes = fieldTypes; + return this; + } + + public Builder defaultPartName(String defaultPartName) { + this.defaultPartName = defaultPartName; + return this; + } + + public Builder limit(long limit) { + this.limit = limit; + return this; + } + + public Builder emitDelete(boolean emitDelete) { + this.emitDelete = emitDelete; + return this; + } + + public MergeOnReadInputFormat build() { + return new MergeOnReadInputFormat(conf, paths, tableState, + fieldTypes, defaultPartName, limit, emitDelete); + } + } + + @VisibleForTesting + public void isEmitDelete(boolean emitDelete) { + this.emitDelete = emitDelete; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 7dedcda..9a32af6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -18,9 +18,11 @@ package org.apache.hudi.table.format.mor; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import java.io.Serializable; +import java.util.Arrays; import java.util.List; /** @@ -35,18 +37,21 @@ public class MergeOnReadTableState implements Serializable { private final String avroSchema; private final String requiredAvroSchema; private final List<MergeOnReadInputSplit> inputSplits; + private final String[] pkFields; public MergeOnReadTableState( RowType rowType, RowType requiredRowType, String avroSchema, String requiredAvroSchema, - List<MergeOnReadInputSplit> inputSplits) { + List<MergeOnReadInputSplit> inputSplits, + String[] pkFields) { this.rowType = rowType; this.requiredRowType = requiredRowType; this.avroSchema = avroSchema; this.requiredAvroSchema = requiredAvroSchema; this.inputSplits = inputSplits; + this.pkFields = pkFields; } public RowType getRowType() { @@ -76,4 +81,30 @@ public class MergeOnReadTableState implements Serializable { .mapToInt(i -> i) .toArray(); } + + /** + * Get the primary key positions in required row type. + */ + public int[] getPkOffsetsInRequired() { + final List<String> fieldNames = requiredRowType.getFieldNames(); + return Arrays.stream(pkFields) + .map(fieldNames::indexOf) + .mapToInt(i -> i) + .toArray(); + } + + /** + * Returns the primary key fields logical type with given offsets. + * + * @param pkOffsets the pk offsets in required row type + * @return pk field logical types + * + * @see #getPkOffsetsInRequired() + */ + public LogicalType[] getPkTypes(int[] pkOffsets) { + final LogicalType[] requiredTypes = requiredRowType.getFields().stream() + .map(RowType.RowField::getType).toArray(LogicalType[]::new); + return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset]) + .toArray(LogicalType[]::new); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java new file mode 100644 index 0000000..b30d6d6 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.util.Arrays; + +/** + * A converter that converts a string array into internal row data fields. + * The converter is designed to be stateful(not pure stateless tool) + * in order to reuse the specific converters. + */ +@Internal +public class StringToRowDataConverter { + private final Converter[] converters; + + public StringToRowDataConverter(LogicalType[] fieldTypes) { + this.converters = Arrays.stream(fieldTypes) + .map(StringToRowDataConverter::getConverter) + .toArray(Converter[]::new); + } + + public Object[] convert(String[] fields) { + ValidationUtils.checkArgument(converters.length == fields.length, + "Field types and values should equal with number"); + + Object[] converted = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + converted[i] = converters[i].convert(fields[i]); + } + return converted; + } + + private interface Converter { + Object convert(String field); + } + + private static Converter getConverter(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case NULL: + return field -> null; + case TINYINT: + return Byte::parseByte; + case SMALLINT: + return Short::parseShort; + case BOOLEAN: + return Boolean::parseBoolean; + case INTEGER: + case TIME_WITHOUT_TIME_ZONE: + return Integer::parseInt; + case BIGINT: + return Long::parseLong; + case FLOAT: + return Float::parseFloat; + case DOUBLE: + return Double::parseDouble; + case DATE: + // see HoodieAvroUtils#convertValueForAvroLogicalTypes + return field -> (int) LocalDate.parse(field).toEpochDay(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return field -> TimestampData.fromEpochMillis(Long.parseLong(field)); + case CHAR: + case VARCHAR: + return StringData::fromString; + case BINARY: + case VARBINARY: + return field -> field.getBytes(StandardCharsets.UTF_8); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return field -> + DecimalData.fromBigDecimal( + new BigDecimal(field), + decimalType.getPrecision(), + decimalType.getScale()); + default: + throw new UnsupportedOperationException( + "Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName()); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 90874f1..321abfa 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -137,7 +137,7 @@ public class TestStreamWriteOperatorCoordinator { } @Test - public void testCheckpointCompleteWithRetry() { + public void testCheckpointCompleteWithException() { final CompletableFuture<byte[]> future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); String inflightInstant = coordinator.getInstant(); @@ -149,7 +149,9 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(0, event); assertThrows(HoodieException.class, () -> coordinator.notifyCheckpointComplete(1), - "Try 3 to commit instant"); + "org.apache.hudi.exception.HoodieException: Instant [20210330153432] has a complete checkpoint [1],\n" + + "but the coordinator has not received full write success events,\n" + + "rolls back the instant and rethrow"); } @Test diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index aaeed94..4abc79a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -260,15 +260,17 @@ public class TestStreamReadOperator { TestConfigurations.ROW_TYPE, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), - Collections.emptyList()); + Collections.emptyList(), + new String[0]); Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys); - MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat( - conf, - FilePathUtils.toFlinkPaths(paths), - hoodieTableState, - rowDataType.getChildren(), - "default", - 1000L); + MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() + .config(conf) + .paths(FilePathUtils.toFlinkPaths(paths)) + .tableState(hoodieTableState) + .fieldTypes(rowDataType.getChildren()) + .defaultPartName("default").limit(1000L) + .emitDelete(true) + .build(); OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory(inputFormat); OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = new OneInputStreamOperatorTestHarness<>( diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 218c742..ffb9859 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -208,6 +208,41 @@ public class HoodieDataSourceITCase extends AbstractTestBase { "some commits should be cleaned"); } + @Test + void testStreamReadWithDeletes() throws Exception { + // create filesystem table named source + + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + // write one commit + TestData.writeData(TestData.DATA_SET_INSERT, conf); + // write another commit with deletes + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + String latestCommit = StreamerUtil.createWriteClient(conf, null) + .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + Map<String, String> options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2"); + options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + + List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10); + final String expected = "[" + + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " + + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " + + "id3,null,null,null,null, " + + "id5,null,null,null,null, " + + "id9,null,null,null,null]"; + assertRowsEquals(result, expected); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testWriteAndRead(ExecMode execMode) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index ae15aba..5e324de 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieTableSource; +import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -151,6 +153,29 @@ public class TestInputFormat { assertThat(actual, is(expected)); } + @Test + void testReadWithDeletes() throws Exception { + beforeEach(HoodieTableType.MERGE_ON_READ); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List<RowData> result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result); + final String expected = "[" + + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " + + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " + + "id3,null,null,null,null, " + + "id5,null,null,null,null, " + + "id9,null,null,null,null]"; + assertThat(actual, is(expected)); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index f8d4e7d..2201aeb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -224,6 +224,10 @@ public class TestData { funcWrapper.close(); } + private static String toStringSafely(Object obj) { + return obj == null ? "null" : obj.toString(); + } + /** * Sort the {@code rows} using field at index 0 and asserts * it equals with the expected string {@code expected}. @@ -233,7 +237,7 @@ public class TestData { */ public static void assertRowsEquals(List<Row> rows, String expected) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); assertThat(rowsString, is(expected)); } @@ -247,7 +251,7 @@ public class TestData { */ public static void assertRowsEquals(List<Row> rows, List<RowData> expected) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); assertThat(rowsString, is(rowDataToString(expected))); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java new file mode 100644 index 0000000..dde19b8 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StringToRowDataConverter; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.temporal.ChronoField; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +/** + * Test cases for {@link StringToRowDataConverter}. + */ +public class TestStringToRowDataConverter { + @Test + void testConvert() { + String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "12345.67"}; + LogicalType[] fieldTypes = new LogicalType[] { + DataTypes.FLOAT().getLogicalType(), + DataTypes.DOUBLE().getLogicalType(), + DataTypes.DATE().getLogicalType(), + DataTypes.TIME(3).getLogicalType(), + DataTypes.TIMESTAMP().getLogicalType(), + DataTypes.DECIMAL(7, 2).getLogicalType() + }; + StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes); + Object[] converted = converter.convert(fields); + Object[] expected = new Object[] { + 1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(), + LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY), + TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()), + DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2) + }; + assertArrayEquals(expected, converted); + } + + @Test + void testRowDataToAvroStringToRowData() { + GenericRowData rowData = new GenericRowData(6); + rowData.setField(0, 1.1f); + rowData.setField(1, 3.4D); + rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay()); + rowData.setField(3, LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY)); + rowData.setField(4, TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli())); + rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)); + + DataType dataType = DataTypes.ROW( + DataTypes.FIELD("f_float", DataTypes.FLOAT()), + DataTypes.FIELD("f_double", DataTypes.DOUBLE()), + DataTypes.FIELD("f_date", DataTypes.DATE()), + DataTypes.FIELD("f_time", DataTypes.TIME(3)), + DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2)) + ); + RowType rowType = (RowType) dataType.getLogicalType(); + RowDataToAvroConverters.RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(rowType); + GenericRecord avroRecord = + (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + StringToRowDataConverter stringToRowDataConverter = + new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0])); + final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames()); + final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey); + Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys); + + GenericRowData converted = new GenericRowData(6); + for (int i = 0; i < 6; i++) { + converted.setField(i, convertedKeys[i]); + } + assertThat(converted, is(rowData)); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index b8d7de6..68e183b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -143,14 +143,9 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public void invoke(RowData value, SinkFunction.Context context) { - if (value.getRowKind() == RowKind.INSERT) { - Row row = (Row) converter.toExternal(value); - assert row != null; - RESULT.get(taskID).add(row); - } else { - throw new RuntimeException( - "CollectSinkFunction received " + value.getRowKind() + " messages."); - } + Row row = (Row) converter.toExternal(value); + assert row != null; + RESULT.get(taskID).add(row); } @Override