[ 
https://issues.apache.org/jira/browse/GOBBLIN-1904?focusedWorklogId=880096&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-880096
 ]

ASF GitHub Bot logged work on GOBBLIN-1904:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Sep/23 04:40
            Start Date: 13/Sep/23 04:40
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3768:
URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323910931


##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+
+
+@Getter
+public class Histogram {

Review Comment:
   needs javadoc



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+class HistogramGroup {

Review Comment:
   also needs javadoc.
   
   I might prefer to make `Group` a static inner class of `Histogram`



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java:
##########
@@ -91,35 +75,23 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
   public static final String USE_ALL_OBJECTS = "use.all.objects";
   public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
 
-  private static final String ENABLE_DYNAMIC_PROBING = 
"salesforce.enableDynamicProbing";
-  private static final String DYNAMIC_PROBING_LIMIT = 
"salesforce.dynamicProbingLimit";
-  private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
-  private static final String MIN_TARGET_PARTITION_SIZE = 
"salesforce.minTargetPartitionSize";
-  private static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000;
-  // this is used to generate histogram buckets smaller than the target 
partition size to allow for more even
-  // packing of the generated partitions
-  private static final String PROBE_TARGET_RATIO = 
"salesforce.probeTargetRatio";
-  private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60;
-  private static final int MIN_SPLIT_TIME_MILLIS = 1000;
-
-  private static final String DAY_PARTITION_QUERY_TEMPLATE =
-      "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " + 
"WHERE ${column} ${greater} ${start}"
-          + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER 
BY DAY_ONLY(${column})";
-  private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT 
count(${column}) cnt FROM ${table} "
-      + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
-
-  private static final String ENABLE_DYNAMIC_PARTITIONING = 
"salesforce.enableDynamicPartitioning";
-  private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = 
"salesforce.earlyStopTotalRecordsLimit";
+  @VisibleForTesting
+  static final String ENABLE_DYNAMIC_PROBING = 
"salesforce.enableDynamicProbing";
+  static final String MIN_TARGET_PARTITION_SIZE = 
"salesforce.minTargetPartitionSize";

Review Comment:
   are these config props to be provided at job invocation?  if so, should be 
`public`



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java:
##########
@@ -277,11 +261,13 @@ private List<WorkUnit> createWorkUnits(
   }
 
   /**
-   *
+   * Generates {@link WorkUnit}s based on a bunch of config values like max 
number of partitions, early stop,
+   * dynamic partitioning, dynamic probing, etc.
    */
-  private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
-    Boolean disableSoft = 
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
 false);
-    log.info("disable soft delete pull: " + disableSoft);
+  @VisibleForTesting
+  List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+    boolean isSoftDeletePullDisabled = 
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
 false);
+    log.info("disable soft delete pull: " + isSoftDeletePullDisabled);

Review Comment:
   I may not understand the intent behind `SfConfig`... but I expected this to 
be encapsulated within



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.math.DoubleMath;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * This class encapsulates everything related to histogram calculation for 
Salesforce. A histogram here refers to a
+ * mapping of number of records to be fetched by time intervals.

Review Comment:
   is the histogram for per-record last modified time buckets?  and is this 
sketch of the time distribution used to inform "multi partitioning"?
   
   while also worth describing via javadoc, probably best to clearly describe 
the nature of the histogram into the class name. e .g. 
`gobblin.salesforce.RecordModTimeHistogramService`



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java:
##########
@@ -155,19 +133,25 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
   }
   @Override
   protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
-    List<WorkUnit> workUnits = null;
-    workUnitConf = new SfConfig(state.getProperties());
+    SalesforceConnector connector = getConnector(state);

Review Comment:
   if used only to initialize the histogram service, scope it to within that 
path of the conditional



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java:
##########
@@ -316,8 +305,8 @@ private List<WorkUnit> 
generateWorkUnitsStrategy(SourceEntity sourceEntity, Sour
       histogramAdjust = new Histogram();
       for (HistogramGroup group : histogram.getGroups()) {
         histogramAdjust.add(group);
-        if (histogramAdjust.getTotalRecordCount() > state
-            .getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, 
DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT)) {
+        long earlyStopRecordLimit = 
state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, 
DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT);

Review Comment:
   also for `SfConfig`



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+
+
+@Getter
+public class Histogram {
+  private long totalRecordCount;
+  private final List<HistogramGroup> groups;
+
+  Histogram() {

Review Comment:
   package-protected is quite uncommon in the project, and certainly not "the 
default".  if you're specifically choosing that, please explain the reasoning 
in the class-level javadoc.  otherwise, make this `public`, `protected`, 
whatever



##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java:
##########
@@ -292,10 +278,12 @@ private List<WorkUnit> 
generateWorkUnitsStrategy(SourceEntity sourceEntity, Sour
     int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 
DEFAULT_MIN_TARGET_PARTITION_SIZE);
 
     // Only support time related watermark
-    if (watermarkType == WatermarkType.SIMPLE || 
Strings.isNullOrEmpty(watermarkColumn) || !state.getPropAsBoolean(
-        ENABLE_DYNAMIC_PARTITIONING)) {
+    if (watermarkType == WatermarkType.SIMPLE
+        || Strings.isNullOrEmpty(watermarkColumn)
+        || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING)) {

Review Comment:
   also this within `SfConfig`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 880096)
    Time Spent: 40m  (was: 0.5h)

> Refactor Salesforce Source to promote testability
> -------------------------------------------------
>
>                 Key: GOBBLIN-1904
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1904
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Salesforce source is difficult to test due to many internal classes that 
> cannot mock inputs and outputs. We should clean this class and use 
> composition to promote testability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to