Re: [PR] [HUDI-6873] fix clustering mor [hudi]
codope merged PR #9774: URL: https://github.com/apache/hudi/pull/9774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
codope commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1758839664 Landing this PR. Test failure is unrelated. The integ test failure should be fixed by #9843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1756457104 ## CI report: * 32ed9c642232d74ff973c0df626223282469fb0d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20274) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1756170043 ## CI report: * 658c987d20c827851b05cd530d3adfda5038df7c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20231) * 32ed9c642232d74ff973c0df626223282469fb0d Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20274) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1756112130 ## CI report: * 658c987d20c827851b05cd530d3adfda5038df7c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20231) * 32ed9c642232d74ff973c0df626223282469fb0d UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
the-other-tim-brown commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1353194753 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -19,47 +19,80 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Properties; -/** - * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. - */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader extends LogFileIterator { + private Option> baseFileIterator; + private HoodieMergedLogRecordScanner scanner; + private Schema schema; + private Properties props; - private final Iterator> recordsIterator; + private TypedProperties payloadProps = new TypedProperties(); + private Option> simpleKeyGenFieldsOpt; + Map records; + HoodieRecordMerger merger; - public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { + public HoodieFileSliceReader(Option baseFileReader, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { +super(scanner); if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); - while (baseIterator.hasNext()) { - scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, -simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); - } + this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); +} else { + this.baseFileIterator = Option.empty(); } -return new HoodieFileSliceReader(scanner.iterator()); +this.scanner = scanner; +this.schema = schema; +this.merger = merger; +if (preCombineField != null) { + payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); +} +this.props = props; +this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; +this.records = scanner.getRecords(); } - private HoodieFileSliceReader(Iterator> recordsItr) { -this.recordsIterator = recordsItr; + private Boolean hasNextInternal() { Review Comment: use primitive `boolean` here ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; + +import java.util.Iterator; +import java.util.Map; + +public class LogFileIterator extends CachingIterator> { + HoodieMergedLogRecordScanner scanner; + Map records; + Iterator iterator; + + protected Option removeLogRecord(String key) { +return Option.ofNullable(records.remove(key)); + } + + public LogFileIterator(HoodieMergedLogRecordScanner scanner) { +this.scanner = scanner; +this.records = scanner.getRecords(); + } + + private Boolean hasNextInternal() { Review Comment: make this a primitive `boolean` ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java: ## @@ -0,0 +1,57 @@ +/*
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1352874129 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -19,47 +19,80 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Properties; -/** - * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. - */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader extends LogFileIterator { + private Option> baseFileIterator; + private HoodieMergedLogRecordScanner scanner; + private Schema schema; + private Properties props; - private final Iterator> recordsIterator; + private TypedProperties payloadProps = new TypedProperties(); + private Option> simpleKeyGenFieldsOpt; + Map records; + HoodieRecordMerger merger; - public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { + public HoodieFileSliceReader(Option baseFileReader, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { +super(scanner); if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); - while (baseIterator.hasNext()) { - scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, -simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); - } + this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); +} else { + this.baseFileIterator = Option.empty(); } -return new HoodieFileSliceReader(scanner.iterator()); +this.scanner = scanner; +this.schema = schema; +this.merger = merger; +if (preCombineField != null) { + payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); +} +this.props = props; +this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; +this.records = scanner.getRecords(); } - private HoodieFileSliceReader(Iterator> recordsItr) { -this.recordsIterator = recordsItr; + private Boolean hasNextInternal() { +while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) { + try { +HoodieRecord currentRecord = baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, props, +simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty()); +Option logRecord = removeLogRecord(currentRecord.getRecordKey()); +if (!logRecord.isPresent()) { Review Comment: https://issues.apache.org/jira/browse/HUDI-6934 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import java.util.Iterator; + +public abstract class CachingIterator implements Iterator { + + protected T nextRecord; + + protected abstract Boolean doHasNext(); + + @Override + public final boolean hasNext() { +return nextRecord != null || doHasNext(); + } + + @Override + public final T next() { +
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1752481582 ## CI report: * 658c987d20c827851b05cd530d3adfda5038df7c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20231) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1752470130 ## CI report: * 658c987d20c827851b05cd530d3adfda5038df7c UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1349438084 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -19,47 +19,80 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Properties; -/** - * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. - */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader extends LogFileIterator { + private Option> baseFileIterator; + private HoodieMergedLogRecordScanner scanner; + private Schema schema; + private Properties props; - private final Iterator> recordsIterator; + private TypedProperties payloadProps = new TypedProperties(); + private Option> simpleKeyGenFieldsOpt; + Map records; + HoodieRecordMerger merger; - public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { + public HoodieFileSliceReader(Option baseFileReader, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { +super(scanner); if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); - while (baseIterator.hasNext()) { - scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, -simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); - } + this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); +} else { + this.baseFileIterator = Option.empty(); } -return new HoodieFileSliceReader(scanner.iterator()); +this.scanner = scanner; +this.schema = schema; +this.merger = merger; +if (preCombineField != null) { + payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); +} +this.props = props; +this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; +this.records = scanner.getRecords(); } - private HoodieFileSliceReader(Iterator> recordsItr) { -this.recordsIterator = recordsItr; + private Boolean hasNextInternal() { +while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) { + try { +HoodieRecord currentRecord = baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, props, +simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty()); +Option logRecord = removeLogRecord(currentRecord.getRecordKey()); +if (!logRecord.isPresent()) { Review Comment: Fine, just fire a JIRA to trace it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1349437382 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import java.util.Iterator; + +public abstract class CachingIterator implements Iterator { + + protected T nextRecord; + + protected abstract Boolean doHasNext(); + + @Override + public final boolean hasNext() { +return nextRecord != null || doHasNext(); + } + + @Override + public final T next() { +T record = nextRecord; +nextRecord = null; +return record; Review Comment: Got it. We can fire a JIRA to trace the improvement in the near future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1349437382 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import java.util.Iterator; + +public abstract class CachingIterator implements Iterator { + + protected T nextRecord; + + protected abstract Boolean doHasNext(); + + @Override + public final boolean hasNext() { +return nextRecord != null || doHasNext(); + } + + @Override + public final T next() { +T record = nextRecord; +nextRecord = null; +return record; Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1751300814 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * 658c987d20c827851b05cd530d3adfda5038df7c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20231) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1751228833 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * ea1001ccef41ff55b0fb003bd8d1245800e4b9cd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20222) * 658c987d20c827851b05cd530d3adfda5038df7c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20231) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1751218297 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * ea1001ccef41ff55b0fb003bd8d1245800e4b9cd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20222) * 658c987d20c827851b05cd530d3adfda5038df7c UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1349135494 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; + +import java.util.Iterator; +import java.util.Map; + +public class LogFileIterator extends CachingIterator> { + HoodieMergedLogRecordScanner scanner; + Map records; + Iterator iterator; + + protected Option removeLogRecord(String key) { +if (records.containsKey(key)) { + return Option.of(records.remove(key)); +} +return Option.empty(); Review Comment: cool! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1349097170 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -19,47 +19,80 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Properties; -/** - * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. - */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader extends LogFileIterator { + private Option> baseFileIterator; + private HoodieMergedLogRecordScanner scanner; + private Schema schema; + private Properties props; - private final Iterator> recordsIterator; + private TypedProperties payloadProps = new TypedProperties(); + private Option> simpleKeyGenFieldsOpt; + Map records; + HoodieRecordMerger merger; - public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { + public HoodieFileSliceReader(Option baseFileReader, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { +super(scanner); if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); - while (baseIterator.hasNext()) { - scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, -simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); - } + this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); +} else { + this.baseFileIterator = Option.empty(); } -return new HoodieFileSliceReader(scanner.iterator()); +this.scanner = scanner; +this.schema = schema; +this.merger = merger; +if (preCombineField != null) { + payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); +} +this.props = props; +this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; +this.records = scanner.getRecords(); } - private HoodieFileSliceReader(Iterator> recordsItr) { -this.recordsIterator = recordsItr; + private Boolean hasNextInternal() { +while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) { + try { +HoodieRecord currentRecord = baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, props, +simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty()); +Option logRecord = removeLogRecord(currentRecord.getRecordKey()); +if (!logRecord.isPresent()) { Review Comment: I tried to mimic the implementation from https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala. We will eventually use the HoodieFilegroupReader for both places. I think we should get this fix into 0.14.1 using the same logic we have. We can discuss improvements to mor base file log merging in the HoodieFilegroupReader implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1348872942 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import java.util.Iterator; + +public abstract class CachingIterator implements Iterator { + + protected T nextRecord; + + protected abstract Boolean doHasNext(); + + @Override + public final boolean hasNext() { +return nextRecord != null || doHasNext(); + } + + @Override + public final T next() { +T record = nextRecord; +nextRecord = null; +return record; Review Comment: I tried to mimic https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala as much as possible so maybe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749937111 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * ea1001ccef41ff55b0fb003bd8d1245800e4b9cd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20222) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1348166006 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -19,47 +19,80 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Properties; -/** - * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. - */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader extends LogFileIterator { + private Option> baseFileIterator; + private HoodieMergedLogRecordScanner scanner; + private Schema schema; + private Properties props; - private final Iterator> recordsIterator; + private TypedProperties payloadProps = new TypedProperties(); + private Option> simpleKeyGenFieldsOpt; + Map records; + HoodieRecordMerger merger; - public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { + public HoodieFileSliceReader(Option baseFileReader, + HoodieMergedLogRecordScanner scanner, Schema schema, String preCombineField, HoodieRecordMerger merger, + Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { +super(scanner); if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); - while (baseIterator.hasNext()) { - scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, -simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); - } + this.baseFileIterator = Option.of(baseFileReader.get().getRecordIterator(schema)); +} else { + this.baseFileIterator = Option.empty(); } -return new HoodieFileSliceReader(scanner.iterator()); +this.scanner = scanner; +this.schema = schema; +this.merger = merger; +if (preCombineField != null) { + payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); +} +this.props = props; +this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt; +this.records = scanner.getRecords(); } - private HoodieFileSliceReader(Iterator> recordsItr) { -this.recordsIterator = recordsItr; + private Boolean hasNextInternal() { +while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) { + try { +HoodieRecord currentRecord = baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, props, +simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty()); +Option logRecord = removeLogRecord(currentRecord.getRecordKey()); +if (!logRecord.isPresent()) { Review Comment: Can we remove the log record directly? Imagine we have duplicates in the base file, the duplicates can not handle correctly once the log record got removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1348164155 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import java.util.Iterator; + +public abstract class CachingIterator implements Iterator { + + protected T nextRecord; + + protected abstract Boolean doHasNext(); + + @Override + public final boolean hasNext() { +return nextRecord != null || doHasNext(); + } + + @Override + public final T next() { +T record = nextRecord; +nextRecord = null; +return record; Review Comment: That means the `doHasNext` must set up the `nextRecord` correctly, which is not very straight-forward. Do we really need this `CachingIterator` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
danny0405 commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1348162604 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; + +import java.util.Iterator; +import java.util.Map; + +public class LogFileIterator extends CachingIterator> { + HoodieMergedLogRecordScanner scanner; + Map records; + Iterator iterator; + + protected Option removeLogRecord(String key) { +if (records.containsKey(key)) { + return Option.of(records.remove(key)); +} +return Option.empty(); Review Comment: Can be simplified as `return Option.ofNullable(records.remove(key));` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749833302 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * 9e6c98f0260ec12a245e71cf74288d4b876c6ccb Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20220) * ea1001ccef41ff55b0fb003bd8d1245800e4b9cd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20222) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749828021 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * 9800e0fd66487b8deb185af490952ec14ec3a40b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20143) * 9e6c98f0260ec12a245e71cf74288d4b876c6ccb Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20220) * ea1001ccef41ff55b0fb003bd8d1245800e4b9cd UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749683846 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * 9800e0fd66487b8deb185af490952ec14ec3a40b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20143) * 9e6c98f0260ec12a245e71cf74288d4b876c6ccb Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20220) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749673213 > > if they have the same precombine, the base file records will be chosen over the log file records. > > Can you show me where is this happening in current code. As fasr as I know that clustering will use the mergd log record reader,w hcih honors the payload. In HoodieFileSliceReader: ``` public static HoodieFileSliceReader getFileSliceReader( Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props, simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false, Option.empty())); } } return new HoodieFileSliceReader(scanner.iterator()); } ``` in HoodieMergedLogRecordScanner: ``` @Override public void processNextRecord(HoodieRecord newRecord) throws IOException { String key = newRecord.getRecordKey(); HoodieRecord prevRecord = records.get(key); if (prevRecord != null) { // Merge and store the combined record HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(prevRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft(); // If pre-combine returns existing record, no need to update it if (combinedRecord.getData() != prevRecord.getData()) { HoodieRecord latestHoodieRecord = combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation()); latestHoodieRecord.unseal(); latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation()); latestHoodieRecord.seal(); // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into records(Map). records.put(key, latestHoodieRecord.copy()); } } else { // Put the record as is // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into records(Map). records.put(key, newRecord.copy()); } } ``` This is fundamentally wrong because the base file records are added to the scanner as new records after all the log files have been scanned. They are then treated by the record merger as the newer record -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
hudi-bot commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749669018 ## CI report: * 25d900c216bdeb1e8bd55ce4533f0ce865c27999 UNKNOWN * 9800e0fd66487b8deb185af490952ec14ec3a40b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20143) * 9e6c98f0260ec12a245e71cf74288d4b876c6ccb UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1347994369 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala: ## @@ -996,6 +996,48 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .save(basePath) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { +var writeOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key()-> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + "hoodie.clustering.inline"-> "true", + "hoodie.clustering.inline.max.commits" -> "2", + "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key", + "hoodie.metadata.enable" -> "false", + "hoodie.datasource.write.row.writer.enable" -> "false" +) +if (recordType.equals(HoodieRecordType.SPARK)) { + writeOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName, +HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet") ++ writeOpts +} +val records1 = recordsToStrings(dataGen.generateInserts("001", 10)).asScala +val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) +inputDF1.write.format("org.apache.hudi") + .options(writeOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + +val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 5)).asScala Review Comment: Ok added a test case with deletes. Good that you pointed this out because my solution didn't handle delete blocks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
codope commented on code in PR #9774: URL: https://github.com/apache/hudi/pull/9774#discussion_r1347867388 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala: ## @@ -996,6 +996,48 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .save(basePath) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { +var writeOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key()-> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + "hoodie.clustering.inline"-> "true", + "hoodie.clustering.inline.max.commits" -> "2", + "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key", + "hoodie.metadata.enable" -> "false", + "hoodie.datasource.write.row.writer.enable" -> "false" +) +if (recordType.equals(HoodieRecordType.SPARK)) { + writeOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName, +HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet") ++ writeOpts +} +val records1 = recordsToStrings(dataGen.generateInserts("001", 10)).asScala +val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) +inputDF1.write.format("org.apache.hudi") + .options(writeOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + +val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 5)).asScala Review Comment: can we add some test (or in this test itself) where we have some deletes and then followed by clustering? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6873] fix clustering mor [hudi]
jonvex commented on PR #9774: URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749098564 @codope @danny0405 could you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org