phet commented on code in PR #3768: URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323910931
########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; + + +@Getter +public class Histogram { Review Comment: needs javadoc ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +@Getter +@AllArgsConstructor +class HistogramGroup { Review Comment: also needs javadoc. I might prefer to make `Group` a static inner class of `Histogram` ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java: ########## @@ -91,35 +75,23 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { public static final String USE_ALL_OBJECTS = "use.all.objects"; public static final boolean DEFAULT_USE_ALL_OBJECTS = false; - private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; - private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit"; - private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000; - private static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; - private static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000; - // this is used to generate histogram buckets smaller than the target partition size to allow for more even - // packing of the generated partitions - private static final String PROBE_TARGET_RATIO = "salesforce.probeTargetRatio"; - private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60; - private static final int MIN_SPLIT_TIME_MILLIS = 1000; - - private static final String DAY_PARTITION_QUERY_TEMPLATE = - "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " + "WHERE ${column} ${greater} ${start}" - + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})"; - private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} " - + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}"; - - private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; - private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; + @VisibleForTesting + static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; + static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; Review Comment: are these config props to be provided at job invocation? if so, should be `public` ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java: ########## @@ -277,11 +261,13 @@ private List<WorkUnit> createWorkUnits( } /** - * + * Generates {@link WorkUnit}s based on a bunch of config values like max number of partitions, early stop, + * dynamic partitioning, dynamic probing, etc. */ - private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, SourceState state, long previousWatermark) { - Boolean disableSoft = state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED, false); - log.info("disable soft delete pull: " + disableSoft); + @VisibleForTesting + List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + boolean isSoftDeletePullDisabled = state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED, false); + log.info("disable soft delete pull: " + isSoftDeletePullDisabled); Review Comment: I may not understand the intent behind `SfConfig`... but I expected this to be encapsulated within ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java: ########## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import com.google.common.math.DoubleMath; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.text.StrSubstitutor; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.exception.RestApiClientException; +import org.apache.gobblin.source.extractor.exception.RestApiConnectionException; +import org.apache.gobblin.source.extractor.exception.RestApiProcessingException; +import org.apache.gobblin.source.extractor.extract.Command; +import org.apache.gobblin.source.extractor.extract.CommandOutput; +import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector; +import org.apache.gobblin.source.extractor.partition.Partition; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.source.extractor.utils.Utils; + +import static org.apache.gobblin.configuration.ConfigurationKeys.*; + + +/** + * This class encapsulates everything related to histogram calculation for Salesforce. A histogram here refers to a + * mapping of number of records to be fetched by time intervals. Review Comment: is the histogram for per-record last modified time buckets? and is this sketch of the time distribution used to inform "multi partitioning"? while also worth describing via javadoc, probably best to clearly describe the nature of the histogram into the class name. e .g. `gobblin.salesforce.RecordModTimeHistogramService` ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java: ########## @@ -155,19 +133,25 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity } @Override protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { - List<WorkUnit> workUnits = null; - workUnitConf = new SfConfig(state.getProperties()); + SalesforceConnector connector = getConnector(state); Review Comment: if used only to initialize the histogram service, scope it to within that path of the conditional ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java: ########## @@ -316,8 +305,8 @@ private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, Sour histogramAdjust = new Histogram(); for (HistogramGroup group : histogram.getGroups()) { histogramAdjust.add(group); - if (histogramAdjust.getTotalRecordCount() > state - .getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT)) { + long earlyStopRecordLimit = state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT); Review Comment: also for `SfConfig` ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; + + +@Getter +public class Histogram { + private long totalRecordCount; + private final List<HistogramGroup> groups; + + Histogram() { Review Comment: package-protected is quite uncommon in the project, and certainly not "the default". if you're specifically choosing that, please explain the reasoning in the class-level javadoc. otherwise, make this `public`, `protected`, whatever ########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java: ########## @@ -292,10 +278,12 @@ private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, Sour int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, DEFAULT_MIN_TARGET_PARTITION_SIZE); // Only support time related watermark - if (watermarkType == WatermarkType.SIMPLE || Strings.isNullOrEmpty(watermarkColumn) || !state.getPropAsBoolean( - ENABLE_DYNAMIC_PARTITIONING)) { + if (watermarkType == WatermarkType.SIMPLE + || Strings.isNullOrEmpty(watermarkColumn) + || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING)) { Review Comment: also this within `SfConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
