Will-Lo commented on code in PR #3768:
URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323861172


##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.math.DoubleMath;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
+import 
org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * This class encapsulates everything related to histogram calculation for 
Salesforce. A histogram here refers to a
+ * mapping of number of records to be fetched by time intervals.
+ */
+@Slf4j
+public class SalesforceHistogramService {
+  private static final int MIN_SPLIT_TIME_MILLIS = 1000;
+  private static final String ZERO_TIME_SUFFIX = "-00:00:00";
+  private static final Gson GSON = new Gson();
+  // this is used to generate histogram buckets smaller than the target 
partition size to allow for more even
+  // packing of the generated partitions
+  private static final String PROBE_TARGET_RATIO = 
"salesforce.probeTargetRatio";
+  private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60;
+  private static final String DYNAMIC_PROBING_LIMIT = 
"salesforce.dynamicProbingLimit";
+  private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
+
+  private static final String DAY_PARTITION_QUERY_TEMPLATE =
+      "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " + 
"WHERE ${column} ${greater} ${start}"
+          + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER 
BY DAY_ONLY(${column})";
+  private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT 
count(${column}) cnt FROM ${table} "
+      + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
+
+  protected SalesforceConnector salesforceConnector = null;
+  private final SfConfig sfConfig;
+
+  SalesforceHistogramService(SfConfig sfConfig) {
+    this.sfConfig = sfConfig;
+  }
+
+  /**
+   * Generate the histogram
+   */
+  Histogram getHistogram(String entity, String watermarkColumn, SourceState 
state,
+      Partition partition) {
+    SalesforceConnector connector = getConnector(state);
+
+    try {
+      if (!connector.connect()) {
+        throw new RuntimeException("Failed to connect.");
+      }
+    } catch (RestApiConnectionException e) {
+      throw new RuntimeException("Failed to connect.", e);
+    }
+
+    Histogram histogram = getHistogramByDayBucketing(connector, entity, 
watermarkColumn, partition);
+
+    // exchange the first histogram group key with the global low watermark to 
ensure that the low watermark is captured
+    // in the range of generated partitions
+    HistogramGroup firstGroup = histogram.get(0);
+    Date lwmDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    histogram.getGroups().set(0, new 
HistogramGroup(Utils.epochToDate(lwmDate.getTime(), 
SalesforceSource.SECONDS_FORMAT),
+        firstGroup.getCount()));
+
+    // refine the histogram
+    if (state.getPropAsBoolean(SalesforceSource.ENABLE_DYNAMIC_PROBING)) {
+      histogram = getRefinedHistogram(connector, entity, watermarkColumn, 
state, partition, histogram);
+    }
+
+    return histogram;
+  }
+
+  /**
+   * Get a histogram with day granularity buckets.
+   */
+  private Histogram getHistogramByDayBucketing(SalesforceConnector connector, 
String entity, String watermarkColumn,
+      Partition partition) {
+    Histogram histogram = new Histogram();
+
+    Calendar calendar = new GregorianCalendar();
+    Date startDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    calendar.setTime(startDate);
+    int startYear = calendar.get(Calendar.YEAR);
+    String lowWatermarkDate = Utils.dateToString(startDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+    Date endDate = Utils.toDate(partition.getHighWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+    calendar.setTime(endDate);
+    int endYear = calendar.get(Calendar.YEAR);
+    String highWatermarkDate = Utils.dateToString(endDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+    Map<String, String> values = new HashMap<>();
+    values.put("table", entity);
+    values.put("column", watermarkColumn);
+    StrSubstitutor sub = new StrSubstitutor(values);
+
+    for (int year = startYear; year <= endYear; year++) {
+      if (year == startYear) {
+        values.put("start", lowWatermarkDate);
+        values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : 
">");
+      } else {
+        values.put("start", getDateString(year));
+        values.put("greater", ">=");
+      }
+
+      if (year == endYear) {
+        values.put("end", highWatermarkDate);
+        values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
+      } else {
+        values.put("end", getDateString(year + 1));
+        values.put("less", "<");
+      }
+
+      String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
+      log.info("Histogram query: " + query);
+
+      histogram.add(parseDayBucketingHistogram(getRecordsForQuery(connector, 
query)));
+    }
+
+    return histogram;
+  }
+
+  protected SalesforceConnector getConnector(State state) {
+    if (this.salesforceConnector == null) {
+      this.salesforceConnector = new SalesforceConnector(state);
+    }
+    return this.salesforceConnector;
+  }

Review Comment:
   Is it possible to send the connector from the source into this class instead 
of reinitializing it? Connectors can sometimes perform authentication/require 
SSL handshake and verification which adds latency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to