[ 
https://issues.apache.org/jira/browse/GOBBLIN-1904?focusedWorklogId=880083&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-880083
 ]

ASF GitHub Bot logged work on GOBBLIN-1904:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Sep/23 02:29
            Start Date: 13/Sep/23 02:29
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3768:
URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323861172


##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.math.DoubleMath;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * This class encapsulates everything related to histogram calculation for 
Salesforce. A histogram here refers to a
+ * mapping of number of records to be fetched by time intervals.
+ */
+@Slf4j
+public class SalesforceHistogramService {
+  private static final int MIN_SPLIT_TIME_MILLIS = 1000;
+  private static final String ZERO_TIME_SUFFIX = "-00:00:00";
+  private static final Gson GSON = new Gson();
+  // this is used to generate histogram buckets smaller than the target 
partition size to allow for more even
+  // packing of the generated partitions
+  private static final String PROBE_TARGET_RATIO = 
"salesforce.probeTargetRatio";
+  private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60;
+  private static final String DYNAMIC_PROBING_LIMIT = 
"salesforce.dynamicProbingLimit";
+  private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
+
+  private static final String DAY_PARTITION_QUERY_TEMPLATE =
+      "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " + 
"WHERE ${column} ${greater} ${start}"
+          + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER 
BY DAY_ONLY(${column})";
+  private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT 
count(${column}) cnt FROM ${table} "
+      + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
+
+  protected SalesforceConnector salesforceConnector = null;
+  private final SfConfig sfConfig;
+
+  SalesforceHistogramService(SfConfig sfConfig) {
+    this.sfConfig = sfConfig;
+  }
+
+  /**
+   * Generate the histogram
+   */
+  Histogram getHistogram(String entity, String watermarkColumn, SourceState 
state,
+      Partition partition) {
+    SalesforceConnector connector = getConnector(state);
+
+    try {
+      if (!connector.connect()) {
+        throw new RuntimeException("Failed to connect.");
+      }
+    } catch (RestApiConnectionException e) {
+      throw new RuntimeException("Failed to connect.", e);
+    }
+
+    Histogram histogram = getHistogramByDayBucketing(connector, entity, 
watermarkColumn, partition);
+
+    // exchange the first histogram group key with the global low watermark to 
ensure that the low watermark is captured
+    // in the range of generated partitions
+    HistogramGroup firstGroup = histogram.get(0);
+    Date lwmDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    histogram.getGroups().set(0, new 
HistogramGroup(Utils.epochToDate(lwmDate.getTime(), 
SalesforceSource.SECONDS_FORMAT),
+        firstGroup.getCount()));
+
+    // refine the histogram
+    if (state.getPropAsBoolean(SalesforceSource.ENABLE_DYNAMIC_PROBING)) {
+      histogram = getRefinedHistogram(connector, entity, watermarkColumn, 
state, partition, histogram);
+    }
+
+    return histogram;
+  }
+
+  /**
+   * Get a histogram with day granularity buckets.
+   */
+  private Histogram getHistogramByDayBucketing(SalesforceConnector connector, 
String entity, String watermarkColumn,
+      Partition partition) {
+    Histogram histogram = new Histogram();
+
+    Calendar calendar = new GregorianCalendar();
+    Date startDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    calendar.setTime(startDate);
+    int startYear = calendar.get(Calendar.YEAR);
+    String lowWatermarkDate = Utils.dateToString(startDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+    Date endDate = Utils.toDate(partition.getHighWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    calendar.setTime(endDate);
+    int endYear = calendar.get(Calendar.YEAR);
+    String highWatermarkDate = Utils.dateToString(endDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+    Map<String, String> values = new HashMap<>();
+    values.put("table", entity);
+    values.put("column", watermarkColumn);
+    StrSubstitutor sub = new StrSubstitutor(values);
+
+    for (int year = startYear; year <= endYear; year++) {
+      if (year == startYear) {
+        values.put("start", lowWatermarkDate);
+        values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : 
">");
+      } else {
+        values.put("start", getDateString(year));
+        values.put("greater", ">=");
+      }
+
+      if (year == endYear) {
+        values.put("end", highWatermarkDate);
+        values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
+      } else {
+        values.put("end", getDateString(year + 1));
+        values.put("less", "<");
+      }
+
+      String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
+      log.info("Histogram query: " + query);
+
+      histogram.add(parseDayBucketingHistogram(getRecordsForQuery(connector, 
query)));
+    }
+
+    return histogram;
+  }
+
+  protected SalesforceConnector getConnector(State state) {
+    if (this.salesforceConnector == null) {
+      this.salesforceConnector = new SalesforceConnector(state);
+    }
+    return this.salesforceConnector;
+  }

Review Comment:
   Is it possible to send the connector from the source into this class instead 
of reinitializing it? Connectors can sometimes perform authentication/require 
SSL handshake and verification which adds latency.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 880083)
    Time Spent: 20m  (was: 10m)

> Refactor Salesforce Source to promote testability
> -------------------------------------------------
>
>                 Key: GOBBLIN-1904
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1904
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Salesforce source is difficult to test due to many internal classes that 
> cannot mock inputs and outputs. We should clean this class and use 
> composition to promote testability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to