[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=358340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358340 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 12/Dec/19 06:14 Start Date: 12/Dec/19 06:14 Worklog Time Spent: 10m Work Description: asfgit commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 358340) Time Spent: 12h 20m (was: 12h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Fix For: 0.15.0 > > Time Spent: 12h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=358127=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358127 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 11/Dec/19 22:12 Start Date: 11/Dec/19 22:12 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/2304f11e033febd4ab3389937ef8ff48b27e99ed?src=pr=desc) will **decrease** coverage by `0.13%`. > The diff coverage is `2.45%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.62% 45.49% -0.14% + Complexity 8989 8988 -1 Files 1904 1904 Lines 7135571564 +209 Branches 7876 7889 +13 - Hits 3255932557 -2 - Misses3579536003 +208 - Partials 3001 3004 +3 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...ce/extractor/extract/restapi/RestApiExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlFeHRyYWN0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [.../org/apache/gobblin/salesforce/ResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0SXRlcmF0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <9.85%> (-2.95%)` | `12 <2> (+1)` | | | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `61.42% <0%> (-1.43%)` | `4% <0%> (ø)` | | | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.42% <0%> (-1.39%)` | `27% <0%> (-1%)` | | | [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `79.43% <0%> (-0.94%)` | `24% <0%> (ø)` | | | [...main/java/org/apache/gobblin/util/HadoopUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvSGFkb29wVXRpbHMuamF2YQ==) | `30.2% <0%> (-0.68%)` | `24% <0%> (-1%)` | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=footer). Last update [2304f11...7d17db0](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=lastupdated). Read the [comment
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=358123=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358123 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 11/Dec/19 22:06 Start Date: 11/Dec/19 22:06 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/2304f11e033febd4ab3389937ef8ff48b27e99ed?src=pr=desc) will **decrease** coverage by `41.49%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master #2722 +/- ## === - Coverage 45.62% 4.13% -41.5% + Complexity 8989 747-8242 === Files 19041904 Lines 71355 71564 +209 Branches 78767889 +13 === - Hits 325592961 -29598 - Misses35795 68284 +32489 + Partials 3001 319-2682 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `0% <0%> (-22.77%)` | `0 <0> (-11)` | | | [...ce/extractor/extract/restapi/RestApiExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlFeHRyYWN0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [.../org/apache/gobblin/salesforce/ResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0SXRlcmF0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | | | [...n/converter/AvroStringFieldDecryptorConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY3J5cHRvLXByb3ZpZGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9BdnJvU3RyaW5nRmllbGREZWNyeXB0b3JDb252ZXJ0ZXIuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | | | [...he/gobblin/cluster/TaskRunnerSuiteThreadModel.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvVGFza1J1bm5lclN1aXRlVGhyZWFkTW9kZWwuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-5%)` | | | [...n/mapreduce/avro/AvroKeyCompactorOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL2F2cm8vQXZyb0tleUNvbXBhY3Rvck91dHB1dEZvcm1hdC5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-3%)` | | | [...apache/gobblin/fork/CopyNotSupportedException.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZm9yay9Db3B5Tm90U3VwcG9ydGVkRXhjZXB0aW9uLmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-1%)` | | | [.../gobblin/kafka/writer/KafkaWriterCommonConfig.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL3dyaXRlci9LYWZrYVdyaXRlckNvbW1vbkNvbmZpZy5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-7%)` | | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356645 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:49 Start Date: 09/Dec/19 22:49 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355728761 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java ## @@ -17,9 +17,24 @@ package org.apache.gobblin.salesforce; -public class SalesforceConfigurationKeys { +public final class SalesforceConfigurationKeys { + private SalesforceConfigurationKeys() { + } public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED = "source.querybased.salesforce.is.soft.deletes.pull.disabled"; - public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 2; - public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 100; + public static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + + // pk-chunking + public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkchunking.test.batch.id.list"; Review comment: thanks! fixed > public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList"; > public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId"; This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356645) Time Spent: 11h 40m (was: 11.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356649=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356649 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:49 Start Date: 09/Dec/19 22:49 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355729094 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.sforce.async.BulkConnection; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader; +import org.apache.gobblin.source.extractor.utils.Utils; + + +/** + * Result Iterator. + * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a result record iterator. + */ +@Slf4j +public class ResultIterator implements Iterator { + private Iterator batchIdResultIdIterator; + private BulkConnection bulkConnection; + private InputStreamCSVReader csvReader; + private List csvHeader; + private int columnSize; + private int urlLoadRetryLimit; + private ResultStruct resultStruct; + private List currentRecord = null; + private Boolean isLoadedCurrentRecord = false; + private int currentFileRowCount = 0; + private int totalRowCount = 0; + + /** + * constructor + * need to initiate the reader and currentRecord + */ + public ResultIterator(BulkConnection bulkConnection, String jobId, String batchIdResultIdString, int urlLoadRetryLimit) { +this.urlLoadRetryLimit = urlLoadRetryLimit; +this.bulkConnection = bulkConnection; +this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, batchIdResultIdString); +if (this.batchIdResultIdIterator.hasNext()) { + this.resultStruct = this.batchIdResultIdIterator.next(); + this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // first file reader +} else { + throw new RuntimeException("No batch-result id found."); +} +this.fulfilCurrentRecord(); +this.csvHeader = this.currentRecord; +this.columnSize = this.csvHeader.size(); +// after fetching cvs header, clean up status +this.resetCurrentRecordStatus(); + } + + /** + * call reader.next and set up currentRecord + */ + private void fulfilCurrentRecord() { Review comment: fixed. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356649) Time Spent: 11h 50m (was: 11h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) >
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356636=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356636 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:47 Start Date: 09/Dec/19 22:47 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355727983 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java ## @@ -17,9 +17,24 @@ package org.apache.gobblin.salesforce; -public class SalesforceConfigurationKeys { +public final class SalesforceConfigurationKeys { + private SalesforceConfigurationKeys() { + } public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED = "source.querybased.salesforce.is.soft.deletes.pull.disabled"; - public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 2; - public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 100; + public static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + + // pk-chunking + public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkchunking.test.batch.id.list"; + public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkchunking.test.job.id"; + public static final String SALESFORCE_PARTITION_TYPE = "salesforce.partitionType"; + public static final String PARTITION_PK_CHUNKING_SIZE = "salesforce.partition.PkChunkingSize"; Review comment: thanks! fixed. good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356636) Time Spent: 11.5h (was: 11h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356633=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356633 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:45 Start Date: 09/Dec/19 22:45 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355727268 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { Review comment: fixed. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356633) Time Spent: 11h 20m (was: 11h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356632=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356632 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:44 Start Date: 09/Dec/19 22:44 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355727010 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (isPkChunkingFetchDone) { + return null; // must return null to represent no more data. +} +isPkChunkingFetchDone = true; // set to true, never come here twice. Review comment: added. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356632) Time Spent: 11h 10m (was: 11h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356631=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356631 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:43 Start Date: 09/Dec/19 22:43 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355726754 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -704,36 +708,114 @@ public boolean bulkApiLogin() throws Exception { return success; } + public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) { Review comment: added javadoc * same as getQueryResultIdsPkChunking but the arguments are different. * this function can take existing batch ids to return JobIdAndBatchIdResultIdList * It is for test/debug. developers may want to skip execute query on SFDC, use a list of existing batch ids This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356631) Time Spent: 11h (was: 10h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 11h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356629 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:34 Start Date: 09/Dec/19 22:34 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355722917 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +153,120 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +log.info("sfdc connector with pkchk"); // TODO: remove this after merge back to OS. +List workUnits = null; +String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, ""); +if (partitionType.equals("PK_CHUNKING")) { + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + workUnits = generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + workUnits = generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} +log.info("Generated {} workUnit(s)", workUnits.size()); +return workUnits; + } + + /** + * generate workUnit for pk chunking + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = executeQueryWithPkChunking(state, previousWatermark); +return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList); + } + + private JobIdAndBatchIdResultIdList executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +State state = new State(sourceState); +WorkUnit workUnit = WorkUnit.createEmpty(); +WorkUnitState workUnitState = new WorkUnitState(workUnit, state); +workUnitState.setId("Execute pk-chunking"); +try { + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + String field = sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + if (startDate != null && field != null) { +String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); +condition = field + " >= " + lowWatermarkDate; + } + Predicate predicate = new Predicate(null, 0, condition, "", null); + List predicateList = Arrays.asList(predicate); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + + if (state.contains(PK_CHUNKING_TEST_JOB_ID)) { +String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, ""); +log.info("---Skip query, fetching result files directly for [jobId={}]", jobId); +String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST); +return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr); + } else { +log.info("---Pk Chunking query submit."); +return salesforceExtractor.getQueryResultIdsPkChunking(entity, predicateList); + } +} catch (Exception e) { + throw new RuntimeException(e); +} + } + + /** + * Create work units by taking a bulkJobId. + * The work units won't contain a query in this case. Instead they will contain a BulkJobId and a list of `batchId:resultId` + * So in extractor, the work to do is just to fetch the resultSet files. + */ + private List createWorkUnits( + SourceEntity sourceEntity, + SourceState state, + JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList +
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356628=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356628 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:33 Start Date: 09/Dec/19 22:33 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355722857 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +153,120 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +log.info("sfdc connector with pkchk"); // TODO: remove this after merge back to OS. Review comment: good catch! removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356628) Time Spent: 10h 40m (was: 10.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=356624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356624 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Dec/19 22:30 Start Date: 09/Dec/19 22:30 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r355721539 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (isPkChunkingFetchDone) { + return null; // must return null to represent no more data. +} +isPkChunkingFetchDone = true; // set to true, never come here twice. +try { + if (!bulkApiLogin()) { +throw new IllegalArgumentException("Invalid Login"); + } +} catch (Exception e) { + throw new RuntimeException(e); +} +String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID); +String batchIdResultIdString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS); +return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, fetchRetryLimit); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); -RecordSet rs = null; +// new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource +if (workUnit.contains(PK_CHUNKING_JOB_ID)) { + log.info("pk-chunking get record set" + workUnit.getProp(PK_CHUNKING_JOB_ID)); + return getRecordSetPkchunking(workUnit); Review comment: It won't be an easy change. We need to change the dynamic-mode to execute all the sub-queries in one batch right after we spilt the giant query, and then we can use the fetching module which PK-Chunking is using and the `ResultIterator` is involved. However we don't have to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356624) Time Spent: 10.5h (was: 10h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353634=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353634 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353909717 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +153,120 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +log.info("sfdc connector with pkchk"); // TODO: remove this after merge back to OS. +List workUnits = null; +String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, ""); +if (partitionType.equals("PK_CHUNKING")) { + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + workUnits = generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + workUnits = generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} +log.info("Generated {} workUnit(s)", workUnits.size()); +return workUnits; + } + + /** + * generate workUnit for pk chunking + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = executeQueryWithPkChunking(state, previousWatermark); +return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList); + } + + private JobIdAndBatchIdResultIdList executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +State state = new State(sourceState); +WorkUnit workUnit = WorkUnit.createEmpty(); +WorkUnitState workUnitState = new WorkUnitState(workUnit, state); +workUnitState.setId("Execute pk-chunking"); +try { + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + String field = sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + if (startDate != null && field != null) { +String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); +condition = field + " >= " + lowWatermarkDate; + } + Predicate predicate = new Predicate(null, 0, condition, "", null); + List predicateList = Arrays.asList(predicate); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + + if (state.contains(PK_CHUNKING_TEST_JOB_ID)) { +String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, ""); +log.info("---Skip query, fetching result files directly for [jobId={}]", jobId); +String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST); +return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr); + } else { +log.info("---Pk Chunking query submit."); +return salesforceExtractor.getQueryResultIdsPkChunking(entity, predicateList); + } +} catch (Exception e) { + throw new RuntimeException(e); +} + } + + /** + * Create work units by taking a bulkJobId. + * The work units won't contain a query in this case. Instead they will contain a BulkJobId and a list of `batchId:resultId` + * So in extractor, the work to do is just to fetch the resultSet files. + */ + private List createWorkUnits( + SourceEntity sourceEntity, + SourceState state, + JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList + )
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353635=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353635 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353904979 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (isPkChunkingFetchDone) { + return null; // must return null to represent no more data. +} +isPkChunkingFetchDone = true; // set to true, never come here twice. Review comment: Please add new lines between blocks, so before and after the if and try blocks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353635) Time Spent: 10h (was: 9h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353630 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353379037 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.sforce.async.BulkConnection; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader; +import org.apache.gobblin.source.extractor.utils.Utils; + + +/** + * Result Iterator. + * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a result record iterator. + */ +@Slf4j +public class ResultIterator implements Iterator { + private Iterator batchIdResultIdIterator; + private BulkConnection bulkConnection; + private InputStreamCSVReader csvReader; + private List csvHeader; + private int columnSize; + private int urlLoadRetryLimit; + private ResultStruct resultStruct; + private List currentRecord = null; + private Boolean isLoadedCurrentRecord = false; + private int currentFileRowCount = 0; + private int totalRowCount = 0; + + /** + * constructor + * need to initiate the reader and currentRecord + */ + public ResultIterator(BulkConnection bulkConnection, String jobId, String batchIdResultIdString, int urlLoadRetryLimit) { +this.urlLoadRetryLimit = urlLoadRetryLimit; +this.bulkConnection = bulkConnection; +this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, batchIdResultIdString); +if (this.batchIdResultIdIterator.hasNext()) { + this.resultStruct = this.batchIdResultIdIterator.next(); + this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // first file reader +} else { + throw new RuntimeException("No batch-result id found."); +} +this.fulfilCurrentRecord(); +this.csvHeader = this.currentRecord; +this.columnSize = this.csvHeader.size(); +// after fetching cvs header, clean up status +this.resetCurrentRecordStatus(); + } + + /** + * call reader.next and set up currentRecord + */ + private void fulfilCurrentRecord() { Review comment: fulfilCurrentRecord -> fulfillCurrentRecord This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353630) Time Spent: 9.5h (was: 9h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 9.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353637=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353637 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353912680 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (isPkChunkingFetchDone) { + return null; // must return null to represent no more data. +} +isPkChunkingFetchDone = true; // set to true, never come here twice. +try { + if (!bulkApiLogin()) { +throw new IllegalArgumentException("Invalid Login"); + } +} catch (Exception e) { + throw new RuntimeException(e); +} +String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID); +String batchIdResultIdString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS); +return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, fetchRetryLimit); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); -RecordSet rs = null; +// new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource +if (workUnit.contains(PK_CHUNKING_JOB_ID)) { + log.info("pk-chunking get record set" + workUnit.getProp(PK_CHUNKING_JOB_ID)); + return getRecordSetPkchunking(workUnit); Review comment: What would it take to unify the code? I see that the new `ResultIterator` class is not used for the non-pk chunking bulk fetch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353637) Time Spent: 10h 10m (was: 10h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353633=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353633 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353901857 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java ## @@ -17,9 +17,24 @@ package org.apache.gobblin.salesforce; -public class SalesforceConfigurationKeys { +public final class SalesforceConfigurationKeys { + private SalesforceConfigurationKeys() { + } public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED = "source.querybased.salesforce.is.soft.deletes.pull.disabled"; - public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 2; - public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 100; + public static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + + // pk-chunking + public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkchunking.test.batch.id.list"; + public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkchunking.test.job.id"; + public static final String SALESFORCE_PARTITION_TYPE = "salesforce.partitionType"; + public static final String PARTITION_PK_CHUNKING_SIZE = "salesforce.partition.PkChunkingSize"; Review comment: Should be "salesforce.partition.pkChunkingSize" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353633) Time Spent: 9h 50m (was: 9h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 9h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353631=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353631 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353900930 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java ## @@ -17,9 +17,24 @@ package org.apache.gobblin.salesforce; -public class SalesforceConfigurationKeys { +public final class SalesforceConfigurationKeys { + private SalesforceConfigurationKeys() { + } public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED = "source.querybased.salesforce.is.soft.deletes.pull.disabled"; - public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 2; - public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 100; + public static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + + // pk-chunking + public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkchunking.test.batch.id.list"; Review comment: Please follow the new naming convention for new config, like "salesforce.pkChunking.testBatchIdList" and "salesforce.pkChunking.testJobId" where the dots are for namespacing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353631) Time Spent: 9h 40m (was: 9.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 9h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353632=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353632 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353904405 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,16 +566,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { Review comment: getRecordSetPkchunking -> getRecordSetPkChunking This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353632) Time Spent: 9h 40m (was: 9.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 9h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353638 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353908976 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +153,120 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +log.info("sfdc connector with pkchk"); // TODO: remove this after merge back to OS. Review comment: Should be removed according to TODO? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353638) Time Spent: 10h 20m (was: 10h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=353636=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353636 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Dec/19 18:35 Start Date: 04/Dec/19 18:35 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r353906224 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -704,36 +708,114 @@ public boolean bulkApiLogin() throws Exception { return success; } + public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) { Review comment: Please add javadoc for this public method. If this is only for testing then mention that in the javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 353636) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=318802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318802 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 26/Sep/19 05:48 Start Date: 26/Sep/19 05:48 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r328443106 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -871,8 +871,10 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 9h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=317008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317008 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 22:19 Start Date: 23/Sep/19 22:19 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `1.64%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## + Coverage 45.04% 45.05% +<.01% - Complexity 8739 8780 +41 Files 1880 1886 +6 Lines 7020570642 +437 Branches 7707 7745 +38 + Hits 3162331826 +203 - Misses3565135870 +219 - Partials 2931 2946 +15 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...ce/extractor/extract/restapi/RestApiExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlFeHRyYWN0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...bblin/compaction/mapreduce/orc/OrcValueMapper.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNWYWx1ZU1hcHBlci5qYXZh) | `78.87% <0%> (-2.38%)` | `16% <0%> (+11%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...ache/gobblin/couchbase/writer/CouchbaseWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyLmphdmE=) | `64.39% <0%> (-1.89%)` | `15% <0%> (+4%)` | | | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.88% <0%> (-0.9%)` | `28% <0%> (-1%)` | | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316988 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 21:41 Start Date: 23/Sep/19 21:41 Worklog Time Spent: 10m Work Description: arekusuri commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-534296716 Hi @htran1 and @zxcware Please hold on this pull request a little bit. We will try another approach to ensure the code is enough mature before we merge into opensource. the doc is here - https://docs.google.com/document/d/1fJ7Gju9tXR8WBbwxct0_l21Ijhb4hZJykYFDhDcRLp4/edit#heading=h.37qi9whhekol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316988) Time Spent: 8h 50m (was: 8h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 8h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316920 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 18:54 Start Date: 23/Sep/19 18:54 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327275396 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String column, long value, String v String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); String batchId = batchIdResultIdArray[0]; String resultId = batchIdResultIdArray[1]; +log.info(String.format("PK-Chunking work unit: fetching file for (jobId=%s, batchId=%s, resultId=%s) ", Review comment: thanks for offline talk, fixed. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316920) Time Spent: 8h 40m (was: 8.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 8h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316878=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316878 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 17:59 Start Date: 23/Sep/19 17:59 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327251198 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -1130,35 +1120,28 @@ public void closeConnection() throws Exception { /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch - * @param batchInfoList list of batch info - * @param waitInterval the polling interval - * @return the last {@link BatchInfo} processed - * @throws InterruptedException - * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) - throws InterruptedException, AsyncApiException { -BatchInfo batchInfo = null; + private void waitForPkBatches(String jobId, BatchInfoList batchInfoList, int waitInterval) { +long toWait = (long)waitInterval * 1000; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); - +log.info(String.format("Waiting for bulk (jobId=%s)", jobId)); Review comment: fixed. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316878) Time Spent: 8.5h (was: 8h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 8.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316876 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 17:58 Start Date: 23/Sep/19 17:58 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327250834 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String column, long value, String v String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); String batchId = batchIdResultIdArray[0]; String resultId = batchIdResultIdArray[1]; +log.info(String.format("PK-Chunking work unit: fetching file for (jobId=%s, batchId=%s, resultId=%s) ", Review comment: thanks for online talk, fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316876) Time Spent: 8h 10m (was: 8h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 8h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316874 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 17:51 Start Date: 23/Sep/19 17:51 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327247892 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String column, long value, String v String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); String batchId = batchIdResultIdArray[0]; String resultId = batchIdResultIdArray[1]; +log.info(String.format("PK-Chunking work unit: fetching file for (jobId=%s, batchId=%s, resultId=%s) ", Review comment: BTW, we were using this way a lot though. I copied the code :) I did some searching in our code, I didn't find good example. Can you pint me out a sample code? Are you talking about `MessageFormat`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316874) Time Spent: 8h (was: 7h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 8h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316863 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 17:40 Start Date: 23/Sep/19 17:40 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327243183 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -857,7 +850,7 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316858=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316858 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 17:36 Start Date: 23/Sep/19 17:36 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327241495 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -869,7 +862,7 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316381=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316381 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:32 Start Date: 23/Sep/19 02:32 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326307389 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -869,7 +862,7 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316380=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316380 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:31 Start Date: 23/Sep/19 02:31 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326934518 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -1130,35 +1120,28 @@ public void closeConnection() throws Exception { /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch - * @param batchInfoList list of batch info - * @param waitInterval the polling interval - * @return the last {@link BatchInfo} processed - * @throws InterruptedException - * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) - throws InterruptedException, AsyncApiException { -BatchInfo batchInfo = null; + private void waitForPkBatches(String jobId, BatchInfoList batchInfoList, int waitInterval) { +long toWait = (long)waitInterval * 1000; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); - +log.info(String.format("Waiting for bulk (jobId=%s)", jobId)); Review comment: Logger {} formatting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316380) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316376=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316376 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:31 Start Date: 23/Sep/19 02:31 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326934432 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String column, long value, String v String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); String batchId = batchIdResultIdArray[0]; String resultId = batchIdResultIdArray[1]; +log.info(String.format("PK-Chunking work unit: fetching file for (jobId=%s, batchId=%s, resultId=%s) ", Review comment: Please use {} logger format instead of String.format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316376) Time Spent: 7h (was: 6h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316377=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316377 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:31 Start Date: 23/Sep/19 02:31 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326307389 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -869,7 +862,7 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316379=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316379 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:31 Start Date: 23/Sep/19 02:31 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326934459 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -775,17 +775,15 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=316378=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316378 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 23/Sep/19 02:31 Start Date: 23/Sep/19 02:31 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r326309541 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -857,7 +850,7 @@ public SalesforceBulkJobId getQueryResultIdsPkChunking(String entity, List Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 7h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=315275=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315275 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 19/Sep/19 19:43 Start Date: 19/Sep/19 19:43 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `1.66%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## + Coverage 45.04% 45.04% +<.01% - Complexity 8739 8778 +39 Files 1880 1886 +6 Lines 7020570644 +439 Branches 7707 7747 +40 + Hits 3162331824 +201 - Misses3565135872 +221 - Partials 2931 2948 +17 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...ce/extractor/extract/restapi/RestApiExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlFeHRyYWN0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...bblin/compaction/mapreduce/orc/OrcValueMapper.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNWYWx1ZU1hcHBlci5qYXZh) | `75% <0%> (-6.25%)` | `15% <0%> (+10%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...ache/gobblin/couchbase/writer/CouchbaseWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyLmphdmE=) | `64.39% <0%> (-1.89%)` | `15% <0%> (+4%)` | | | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-0.72%)` | `15% <0%> (ø)` | | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=314859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314859 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 19/Sep/19 07:03 Start Date: 19/Sep/19 07:03 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.02%`. > The diff coverage is `1.65%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 45.01% -0.03% - Complexity 8739 8768 +29 Files 1880 1886 +6 Lines 7020570589 +384 Branches 7707 7737 +30 + Hits 3162331775 +152 - Misses3565135862 +211 - Partials 2931 2952 +21 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...ce/extractor/extract/restapi/RestApiExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlFeHRyYWN0b3IuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...ache/gobblin/couchbase/writer/CouchbaseWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyLmphdmE=) | `64.39% <0%> (-1.89%)` | `15% <0%> (+4%)` | | | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.88% <0%> (-0.9%)` | `28% <0%> (-1%)` | | | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `53.91% <0%> (-0.51%)` | `27% <0%> (ø)` | | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=314847=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314847 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 19/Sep/19 06:29 Start Date: 19/Sep/19 06:29 Worklog Time Spent: 10m Work Description: arekusuri commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-532986963 Test result: https://ltx1-holdemaz05.grid.linkedin.com:8443/hdfs/jobs/exttest/lumos/uif-avro/uif/Salesforce_Core/User/20190919062550_full/part.task_Salesforce_User_full_1568874183318_0_0.avro This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 314847) Time Spent: 6.5h (was: 6h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 6.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=314508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314508 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 18/Sep/19 17:34 Start Date: 18/Sep/19 17:34 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325803379 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -144,28 +142,18 @@ private final boolean pkChunkingSkipCountCheck; private final boolean bulkApiUseQueryAll; + private WorkUnitState workUnitState; + public SalesforceExtractor(WorkUnitState state) { super(state); -this.sfConnector = (SalesforceConnector) this.connector; - -// don't allow pk chunking if max partitions too high or have user specified partitions -if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) -|| state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, -ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > PK_CHUNKING_MAX_PARTITIONS_LIMIT) { - if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) { -log.warn("Max partitions too high, so PK chunking is not enabled"); - } - - this.pkChunking = false; -} else { - this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false); -} +this.workUnitState = state; +this.sfConnector = (SalesforceConnector) this.connector; this.pkChunkingSize = Math.max(MIN_PK_CHUNKING_SIZE, -Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); +Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE))); -this.pkChunkingSkipCountCheck = state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK); +this.pkChunkingSkipCountCheck = true;// won't be able to get count Review comment: removed. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 314508) Time Spent: 6h 20m (was: 6h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 6h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313924=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313924 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 21:15 Start Date: 17/Sep/19 21:15 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.02%`. > The diff coverage is `1.66%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 45.01% -0.03% - Complexity 8739 8770 +31 Files 1880 1886 +6 Lines 7020570591 +386 Branches 7707 7739 +32 + Hits 3162331779 +156 - Misses3565135864 +213 - Partials 2931 2948 +17 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...ache/gobblin/couchbase/writer/CouchbaseWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyLmphdmE=) | `64.39% <0%> (-1.89%)` | `15% <0%> (+4%)` | | | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `53.91% <0%> (-0.51%)` | `27% <0%> (ø)` | | | [...che/gobblin/hive/metastore/HiveMetaStoreUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL21ldGFzdG9yZS9IaXZlTWV0YVN0b3JlVXRpbHMuamF2YQ==) | `31.69% <0%> (-0.15%)` | `12% <0%> (ø)` | | | [...e/modules/flowgraph/datanodes/fs/AdlsDataNode.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvZGF0YW5vZGVzL2ZzL0FkbHNEYXRhTm9kZS5qYXZh) | `50% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313923=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313923 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 21:11 Start Date: 17/Sep/19 21:11 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.02%`. > The diff coverage is `1.66%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 45.01% -0.03% - Complexity 8739 8770 +31 Files 1880 1886 +6 Lines 7020570591 +386 Branches 7707 7739 +32 + Hits 3162331779 +156 - Misses3565135864 +213 - Partials 2931 2948 +17 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...ache/gobblin/couchbase/writer/CouchbaseWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyLmphdmE=) | `64.39% <0%> (-1.89%)` | `15% <0%> (+4%)` | | | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `53.91% <0%> (-0.51%)` | `27% <0%> (ø)` | | | [...che/gobblin/hive/metastore/HiveMetaStoreUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL21ldGFzdG9yZS9IaXZlTWV0YVN0b3JlVXRpbHMuamF2YQ==) | `31.69% <0%> (-0.15%)` | `12% <0%> (ø)` | | | [...e/modules/flowgraph/datanodes/fs/AdlsDataNode.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvZGF0YW5vZGVzL2ZzL0FkbHNEYXRhTm9kZS5qYXZh) | `50% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313876 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 19:50 Start Date: 17/Sep/19 19:50 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325340740 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -144,28 +142,18 @@ private final boolean pkChunkingSkipCountCheck; private final boolean bulkApiUseQueryAll; + private WorkUnitState workUnitState; + public SalesforceExtractor(WorkUnitState state) { super(state); -this.sfConnector = (SalesforceConnector) this.connector; - -// don't allow pk chunking if max partitions too high or have user specified partitions -if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) Review comment: This part code was only for **PK chunking**. We don't need 2nd level PK-chunking any more. (if we have normal pre-partition, we should not use pk chunking. It burns out request quota) I create separate function - getQueryResultIdsPkChunking for pkchunking. and made it not to depends on class member this.pkChunking, therefore removed pkChunking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313876) Time Spent: 5h 40m (was: 5.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313879=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313879 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 19:50 Start Date: 17/Sep/19 19:50 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325344285 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception { BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed) { Review comment: Thanks! removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313879) Time Spent: 5h 50m (was: 5h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313877 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 19:50 Start Date: 17/Sep/19 19:50 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325343123 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception { BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); Review comment: >> BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); I try not to change the existing code in **getQueryResultIds**. And this line is necessary. **getBatchInfoList** function's return value is **BatchIdAndResultId** which means a list of resultIds with corresponded batch Ids. One bulkJobId could have multiple results even though it is not a pk-chunking. I am not use **getQueryResultIds** for pkchunking. If we want to do further code refactoring, let's do it in another ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313877) Time Spent: 5h 40m (was: 5.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313875 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 19:50 Start Date: 17/Sep/19 19:50 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325354169 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -979,39 +1118,33 @@ private void fetchResultBatchWithRetry(RecordSetList rs) @Override public void closeConnection() throws Exception { if (this.bulkConnection != null -&& !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) { +&& !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) { log.info("Closing salesforce bulk job connection"); - this.bulkConnection.closeJob(this.bulkJob.getId()); + this.bulkConnection.closeJob(this.getBulkJobId()); } } - public static List constructGetCommand(String restQuery) { + private static List constructGetCommand(String restQuery) { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch * @param batchInfoList list of batch info - * @param retryInterval the polling interval + * @param waitInterval the polling interval * @return the last {@link BatchInfo} processed * @throws InterruptedException * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) + private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) throws InterruptedException, AsyncApiException { BatchInfo batchInfo = null; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); -// Wait for all batches other than the first one. The first one is not processed in PK chunking mode -for (int i = 1; i < batchInfos.length; i++) { - BatchInfo bi = batchInfos[i]; - - // get refreshed job status - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - - while ((bi.getState() != BatchStateEnum.Completed) - && (bi.getState() != BatchStateEnum.Failed)) { -Thread.sleep(retryInterval * 1000); +for (BatchInfo bi: batchInfos) { + BatchStateEnum state = bi.getState(); + while (state != BatchStateEnum.Completed && state != BatchStateEnum.Failed && state != BatchStateEnum.NotProcessed) { +Thread.sleep(waitInterval * 1000); bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); Review comment: Good catch! Thanks! I should not use the state variable. My test worked, because I was using break point, the time was enough to let sfdc execute. I did more refactoring for this part. Pushing code advance. Will do test during today. and update you. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313875) Time Spent: 5h 40m (was: 5.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313878=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313878 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 19:50 Start Date: 17/Sep/19 19:50 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325346830 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -979,39 +1118,33 @@ private void fetchResultBatchWithRetry(RecordSetList rs) @Override public void closeConnection() throws Exception { if (this.bulkConnection != null -&& !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) { +&& !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) { log.info("Closing salesforce bulk job connection"); - this.bulkConnection.closeJob(this.bulkJob.getId()); + this.bulkConnection.closeJob(this.getBulkJobId()); } } - public static List constructGetCommand(String restQuery) { + private static List constructGetCommand(String restQuery) { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch * @param batchInfoList list of batch info - * @param retryInterval the polling interval + * @param waitInterval the polling interval * @return the last {@link BatchInfo} processed * @throws InterruptedException * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) + private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) throws InterruptedException, AsyncApiException { BatchInfo batchInfo = null; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); -// Wait for all batches other than the first one. The first one is not processed in PK chunking mode -for (int i = 1; i < batchInfos.length; i++) { - BatchInfo bi = batchInfos[i]; - - // get refreshed job status - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - - while ((bi.getState() != BatchStateEnum.Completed) - && (bi.getState() != BatchStateEnum.Failed)) { -Thread.sleep(retryInterval * 1000); +for (BatchInfo bi: batchInfos) { Review comment: I had a ticket for this - https://jira01.corp.linkedin.com:8443/browse/DSS-1 pkchunking is using this function. Sometimes, the parent may not be the first element in the list. (see screenshot in ticket) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313878) Time Spent: 5h 40m (was: 5.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313802 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325283786 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -979,39 +1118,33 @@ private void fetchResultBatchWithRetry(RecordSetList rs) @Override public void closeConnection() throws Exception { if (this.bulkConnection != null -&& !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) { +&& !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) { log.info("Closing salesforce bulk job connection"); - this.bulkConnection.closeJob(this.bulkJob.getId()); + this.bulkConnection.closeJob(this.getBulkJobId()); } } - public static List constructGetCommand(String restQuery) { + private static List constructGetCommand(String restQuery) { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch * @param batchInfoList list of batch info - * @param retryInterval the polling interval + * @param waitInterval the polling interval * @return the last {@link BatchInfo} processed * @throws InterruptedException * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) + private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) throws InterruptedException, AsyncApiException { BatchInfo batchInfo = null; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); -// Wait for all batches other than the first one. The first one is not processed in PK chunking mode -for (int i = 1; i < batchInfos.length; i++) { - BatchInfo bi = batchInfos[i]; - - // get refreshed job status - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - - while ((bi.getState() != BatchStateEnum.Completed) - && (bi.getState() != BatchStateEnum.Failed)) { -Thread.sleep(retryInterval * 1000); +for (BatchInfo bi: batchInfos) { + BatchStateEnum state = bi.getState(); + while (state != BatchStateEnum.Completed && state != BatchStateEnum.Failed && state != BatchStateEnum.NotProcessed) { +Thread.sleep(waitInterval * 1000); bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); Review comment: You are assigning to the outside loop variable. Not sure if this is intentional. Also, I don't see state being updated, so won't this loop forever? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313802) Time Spent: 5h (was: 4h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313804=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313804 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325281910 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception { BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); Review comment: Remove this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313804) Time Spent: 5h 10m (was: 5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313807=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313807 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325267416 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception { BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed) { Review comment: Remove this block too since usingPkChunking is no longer set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313807) Time Spent: 5.5h (was: 5h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313803=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313803 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325284644 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -144,28 +142,18 @@ private final boolean pkChunkingSkipCountCheck; private final boolean bulkApiUseQueryAll; + private WorkUnitState workUnitState; + public SalesforceExtractor(WorkUnitState state) { super(state); -this.sfConnector = (SalesforceConnector) this.connector; - -// don't allow pk chunking if max partitions too high or have user specified partitions -if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) Review comment: How come this check was removed? If the user has specified partitions then that should override PK chunking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313803) Time Spent: 5h 10m (was: 5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313805 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325264698 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -144,28 +142,18 @@ private final boolean pkChunkingSkipCountCheck; private final boolean bulkApiUseQueryAll; + private WorkUnitState workUnitState; + public SalesforceExtractor(WorkUnitState state) { super(state); -this.sfConnector = (SalesforceConnector) this.connector; - -// don't allow pk chunking if max partitions too high or have user specified partitions -if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) -|| state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, -ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > PK_CHUNKING_MAX_PARTITIONS_LIMIT) { - if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) { -log.warn("Max partitions too high, so PK chunking is not enabled"); - } - - this.pkChunking = false; -} else { - this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false); -} +this.workUnitState = state; +this.sfConnector = (SalesforceConnector) this.connector; this.pkChunkingSize = Math.max(MIN_PK_CHUNKING_SIZE, -Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); +Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE))); -this.pkChunkingSkipCountCheck = state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK); +this.pkChunkingSkipCountCheck = true;// won't be able to get count Review comment: Should remove this since the config is no longer used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313805) Time Spent: 5h 10m (was: 5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=313806=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313806 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 17/Sep/19 17:08 Start Date: 17/Sep/19 17:08 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325283088 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -979,39 +1118,33 @@ private void fetchResultBatchWithRetry(RecordSetList rs) @Override public void closeConnection() throws Exception { if (this.bulkConnection != null -&& !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) { +&& !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) { log.info("Closing salesforce bulk job connection"); - this.bulkConnection.closeJob(this.bulkJob.getId()); + this.bulkConnection.closeJob(this.getBulkJobId()); } } - public static List constructGetCommand(String restQuery) { + private static List constructGetCommand(String restQuery) { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch * @param batchInfoList list of batch info - * @param retryInterval the polling interval + * @param waitInterval the polling interval * @return the last {@link BatchInfo} processed * @throws InterruptedException * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) + private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int waitInterval) throws InterruptedException, AsyncApiException { BatchInfo batchInfo = null; BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); -// Wait for all batches other than the first one. The first one is not processed in PK chunking mode -for (int i = 1; i < batchInfos.length; i++) { - BatchInfo bi = batchInfos[i]; - - // get refreshed job status - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - - while ((bi.getState() != BatchStateEnum.Completed) - && (bi.getState() != BatchStateEnum.Failed)) { -Thread.sleep(retryInterval * 1000); +for (BatchInfo bi: batchInfos) { Review comment: Is it no longer true that the first BatchInfo is for the parent and not the pk chunking batches or are you considering it safe to apply the same check since the state should be completed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 313806) Time Spent: 5h 20m (was: 5h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 5h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312446 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 23:45 Start Date: 13/Sep/19 23:45 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324393059 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +156,101 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, ""); +if (partitionType.equals("PK_CHUNKING")) { + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + SalesforceBulkJobId salesforceBulkJobId = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, salesforceBulkJobId); + return ret; + } + + private SalesforceBulkJobId executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +State state = new State(sourceState); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("Execute pk-chunking"); Review comment: Hi @zxcware is this OK? I am trying to set id for workUnit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312446) Time Spent: 4h 50m (was: 4h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 4h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312419=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312419 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 22:53 Start Date: 13/Sep/19 22:53 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.08%`. > The diff coverage is `1.78%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 44.96% -0.09% - Complexity 8739 8752 +13 Files 1880 1884 +4 Lines 7020570454 +249 Branches 7707 7730 +23 + Hits 3162331678 +55 - Misses3565135831 +180 - Partials 2931 2945 +14 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-0.72%)` | `15% <0%> (ø)` | | | [...che/gobblin/hive/metastore/HiveMetaStoreUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL21ldGFzdG9yZS9IaXZlTWV0YVN0b3JlVXRpbHMuamF2YQ==) | `31.69% <0%> (-0.15%)` | `12% <0%> (ø)` | | | [...e/modules/flowgraph/datanodes/fs/AdlsDataNode.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvZGF0YW5vZGVzL2ZzL0FkbHNEYXRhTm9kZS5qYXZh) | `50% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: | | [...in/source/extractor/extract/kafka/KafkaSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVNvdXJjZS5qYXZh) | `0% <0%> (ø)` | `0% <0%> (ø)` | :arrow_down: | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312402 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 22:17 Start Date: 13/Sep/19 22:17 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324385831 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +156,98 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, ""); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); Review comment: Thanks, will add it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312402) Time Spent: 4.5h (was: 4h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 4.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312400 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 22:13 Start Date: 13/Sep/19 22:13 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324385070 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,11 +576,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private String partitionPkChunkingJobId = null; + private Iterator partitionPkChunkingBatchIdResultIterator = null; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (partitionPkChunkingBatchIdResultIterator == null) { + partitionPkChunkingJobId = workUnit.getProp(PK_CHUNKING_JOB_ID); + partitionPkChunkingBatchIdResultIterator = Arrays.stream(workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS).split(",")).iterator(); +} +if (!partitionPkChunkingBatchIdResultIterator.hasNext()) { + return null; +} +try { + if (!bulkApiLogin()) { +throw new IllegalArgumentException("Invalid Login"); + } +} catch (Exception e) { + throw new RuntimeException(e); +} +String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); +String batchId = batchIdResultIdArray[0]; +String resultId = batchIdResultIdArray[1]; +List rs = fetchPkChunkingResultSetWithRetry(bulkConnection, partitionPkChunkingJobId, batchId, resultId, fetchRetryLimit); +return rs.iterator(); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); -RecordSet rs = null; + +// new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource +if (!workUnit.getProp(PK_CHUNKING_JOB_ID, "").equals("")) { Review comment: Thanks! will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312400) Time Spent: 4h 20m (was: 4h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 4h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312399=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312399 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 22:09 Start Date: 13/Sep/19 22:09 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324384004 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,12 +156,98 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, ""); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); Review comment: This is where to comment the right use cases to enable `PK_CHUNKING`, as it's the entry point ``` // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. // always ingest data later than or equal source.querybased.start.value. // we should only pk chunking based work units only in case of snapshot/full ingestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312399) Time Spent: 4h 10m (was: 4h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 4h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312398 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 22:09 Start Date: 13/Sep/19 22:09 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324383638 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -588,11 +576,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private String partitionPkChunkingJobId = null; + private Iterator partitionPkChunkingBatchIdResultIterator = null; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { +if (partitionPkChunkingBatchIdResultIterator == null) { + partitionPkChunkingJobId = workUnit.getProp(PK_CHUNKING_JOB_ID); + partitionPkChunkingBatchIdResultIterator = Arrays.stream(workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS).split(",")).iterator(); +} +if (!partitionPkChunkingBatchIdResultIterator.hasNext()) { + return null; +} +try { + if (!bulkApiLogin()) { +throw new IllegalArgumentException("Invalid Login"); + } +} catch (Exception e) { + throw new RuntimeException(e); +} +String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); +String batchId = batchIdResultIdArray[0]; +String resultId = batchIdResultIdArray[1]; +List rs = fetchPkChunkingResultSetWithRetry(bulkConnection, partitionPkChunkingJobId, batchId, resultId, fetchRetryLimit); +return rs.iterator(); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); -RecordSet rs = null; + +// new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource +if (!workUnit.getProp(PK_CHUNKING_JOB_ID, "").equals("")) { Review comment: Are you checking if `PK_CHUNKING_JOB_ID` exists in workUnit? If so, we can use `workUnit.contains(PK_CHUNKING_JOB_ID)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312398) Time Spent: 4h 10m (was: 4h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 4h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=312359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312359 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 21:44 Start Date: 13/Sep/19 21:44 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.07%`. > The diff coverage is `1.78%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 44.96% -0.08% - Complexity 8739 8753 +14 Files 1880 1884 +4 Lines 7020570454 +249 Branches 7707 7730 +23 + Hits 3162331680 +57 - Misses3565135831 +180 - Partials 2931 2943 +12 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-0.72%)` | `15% <0%> (ø)` | | | [...che/gobblin/hive/metastore/HiveMetaStoreUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL21ldGFzdG9yZS9IaXZlTWV0YVN0b3JlVXRpbHMuamF2YQ==) | `31.69% <0%> (-0.15%)` | `12% <0%> (ø)` | | | [...e/modules/flowgraph/datanodes/fs/AdlsDataNode.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvZGF0YW5vZGVzL2ZzL0FkbHNEYXRhTm9kZS5qYXZh) | `50% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: | | [...in/source/extractor/extract/kafka/KafkaSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVNvdXJjZS5qYXZh) | `0% <0%> (ø)` | `0% <0%> (ø)` | :arrow_down: | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=311839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-311839 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 13/Sep/19 01:50 Start Date: 13/Sep/19 01:50 Worklog Time Spent: 10m Work Description: codecov-io commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-531069100 # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=h1) Report > Merging [#2722](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3?src=pr=desc) will **decrease** coverage by `0.07%`. > The diff coverage is `1.78%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/graphs/tree.svg?width=650=4MgURJ0bGc=150=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#2722 +/- ## - Coverage 45.04% 44.96% -0.08% - Complexity 8739 8754 +15 Files 1880 1884 +4 Lines 7020570454 +249 Branches 7707 7730 +23 + Hits 3162331681 +58 - Misses3565135831 +180 - Partials 2931 2942 +11 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2722?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...obblin/salesforce/SalesforceConfigurationKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUNvbmZpZ3VyYXRpb25LZXlzLmphdmE=) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: | | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.74% <5.66%> (-3.02%)` | `12 <1> (+1)` | | | [...obblin/service/monitoring/FlowStatusGenerator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9GbG93U3RhdHVzR2VuZXJhdG9yLmphdmE=) | `82.14% <0%> (-7.15%)` | `11% <0%> (-1%)` | | | [...apache/gobblin/runtime/local/LocalJobLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9jYWwvTG9jYWxKb2JMYXVuY2hlci5qYXZh) | `61.81% <0%> (-2.34%)` | `5% <0%> (ø)` | | | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-0.72%)` | `15% <0%> (ø)` | | | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.31% <0%> (-0.47%)` | `28% <0%> (-1%)` | | | [...che/gobblin/hive/metastore/HiveMetaStoreUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL21ldGFzdG9yZS9IaXZlTWV0YVN0b3JlVXRpbHMuamF2YQ==) | `31.69% <0%> (-0.15%)` | `12% <0%> (ø)` | | | [...e/modules/flowgraph/datanodes/fs/AdlsDataNode.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2722/diff?src=pr=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvZGF0YW5vZGVzL2ZzL0FkbHNEYXRhTm9kZS5qYXZh) | `50% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: | |
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=309120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-309120 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 09/Sep/19 17:50 Start Date: 09/Sep/19 17:50 Worklog Time Spent: 10m Work Description: htran1 commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-529592519 @arekusuri, can you look at the Travis build failures? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 309120) Time Spent: 3h 40m (was: 3.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 3h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=308141=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308141 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 20:39 Start Date: 06/Sep/19 20:39 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321901956 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +927,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchPkChunkingResultSetWithRetry( Review comment: I see. It's an existing logic. Then that's fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308141) Time Spent: 3.5h (was: 3h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 3.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=308119=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308119 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 19:47 Start Date: 06/Sep/19 19:47 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321884734 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; Review comment: We are using **>=**. It is start time. And it should be inclusive. we don’t need end time in our case. We usually fetch the whole table. In case we want to start from a specific time, users can set up **source.querybased.start.value**. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308119) Time Spent: 3h 20m (was: 3h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 3h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=308117=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308117 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 19:32 Start Date: 06/Sep/19 19:32 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321880074 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +927,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchPkChunkingResultSetWithRetry( Review comment: You are right! There is no wait! I copied the retry logic from **fetchResultBatchWithRetry** and didn't look into it. I have another thought - Since the gobblin would re-execute the workUnit again anyway when the workUnit fails, we can just remove the retry. how do you think of? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308117) Time Spent: 3h 10m (was: 3h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 3h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=308012=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308012 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 16:55 Start Date: 06/Sep/19 16:55 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321822304 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +927,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchPkChunkingResultSetWithRetry( Review comment: So, bulkConnection.getQueryResultStream does not have Retry? If we need retry, we should some backoff to avoid immediate attempts as consecutive retries without waiting might be fruitless? In this case, we can leverage `Retryer`, checkout `WriterUtils.mkdirsWithRecursivePermissionWithRetry` for an example. It's straightforward to make the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308012) Time Spent: 2h 50m (was: 2h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 2h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=308013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308013 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 16:55 Start Date: 06/Sep/19 16:55 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321823807 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; Review comment: What about inclusiveness? Is the start-time or end-time inclusive or exclusive? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308013) Time Spent: 3h (was: 2h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 3h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=307556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307556 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 00:49 Start Date: 06/Sep/19 00:49 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321539964 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; Review comment: How about we leave this refactoring later? It doesn't look like I can use the function directly. In PK-chunking case, we only have start-time, we don't have end-time. end-time is always the current-time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307556) Time Spent: 2h 40m (was: 2.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 2h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=307551=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307551 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 06/Sep/19 00:38 Start Date: 06/Sep/19 00:38 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321538410 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchWithRetry( Review comment: Closing the thread. Please reopen it, if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307551) Time Spent: 2.5h (was: 2h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 2.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=306715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306715 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Sep/19 20:48 Start Date: 04/Sep/19 20:48 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r320968187 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchWithRetry( Review comment: sfdc retrying is `to prepare and create result set files from query data`. After sfdc, create the result set files, we request to fetch/download the files. Our retrying is for fetch/download files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306715) Time Spent: 2h 20m (was: 2h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 2h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=306714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306714 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Sep/19 20:46 Start Date: 04/Sep/19 20:46 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r320967132 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + String field = sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + if (startDate != null && field != null) { +String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); +condition = field + " >= " + lowWatermarkDate; + } + Predicate predicate = new Predicate(null, 0, condition, "", null); + List predicateList = Arrays.asList(predicate); + List ids = salesforceExtractor.getQueryResultIds(entity, predicateList); + return ids; +} catch (Exception e) { + throw new RuntimeException(e); +} + } + + private List createWorkUnits( + SourceEntity sourceEntity, + SourceState state, + List batchResultIds + ) { +String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY); +Extract.TableType tableType = Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase()); +String outputTableName = sourceEntity.getDestTableName(); +Extract extract = createExtract(tableType, nameSpaceName, outputTableName); + +List workUnits = Lists.newArrayList(); +int partitionNumber = state.getPropAsInt(SOURCE_SOURCE_MAX_NUMBER_OF_PARTITIONS, 1); +int maxPartition = (batchResultIds.size() + partitionNumber - 1)/partitionNumber; +List> partitionedResultIds = Lists.partition(batchResultIds, maxPartition); +String bulkJobId = batchResultIds.get(0).getBulkJobId(); Review comment: Made it ``` class SalesforceBulkJob { String jobId; List batchIdAndResultIdList; } ``` This is an automated message from the Apache
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=306707=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306707 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Sep/19 20:29 Start Date: 04/Sep/19 20:29 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r320960136 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchWithRetry( Review comment: If bulk api already does retry, can we remove the retry on our side? Application layer retry after api layer retries mostly won't change the error result if any. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306707) Time Spent: 2h (was: 1h 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 2h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=305999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305999 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 04/Sep/19 01:04 Start Date: 04/Sep/19 01:04 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713269 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); Review comment: I was thinking we may want to keep 2nd level PK-chunking and better have different property for them. As we discussed, we don't think 2nd level PK-chunking makes sense. Will remove this property. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305999) Time Spent: 1h 50m (was: 1h 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304792 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 31/Aug/19 06:16 Start Date: 31/Aug/19 06:16 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713269 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); Review comment: I was thinking we may want to keep 2nd level PK-chunking and better have different property for them. As we discussed, we don't 2nd level PK-chunking doesn't make sense. Will remove this property. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304792) Time Spent: 1h 40m (was: 1.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304790=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304790 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 31/Aug/19 06:14 Start Date: 31/Aug/19 06:14 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713206 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); Review comment: good to know. Will fix them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304790) Time Spent: 1h 20m (was: 1h 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304791=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304791 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 31/Aug/19 06:14 Start Date: 31/Aug/19 06:14 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713206 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); Review comment: Thanks! Good to know. Will fix them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304791) Time Spent: 1.5h (was: 1h 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304789=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304789 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 31/Aug/19 06:13 Start Date: 31/Aug/19 06:13 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713198 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchWithRetry( Review comment: Bulk API has retry. It means if the result set is too big, it would retry 15 times creating/caching the result set This retry is our logic. When we fetch the result set file, if there is any failure, we want to retry specified times. It won't hurt. if the result file can be fetch at first time, the function returns the result set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304789) Time Spent: 1h 10m (was: 1h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304589=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304589 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 30/Aug/19 20:52 Start Date: 30/Aug/19 20:52 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319667737 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); Review comment: What's the motivation from line 180 - 183? If you want to copy, we can simply do `State state = new State(sourceState)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304589) Time Spent: 1h (was: 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304590=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304590 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 30/Aug/19 20:52 Start Date: 30/Aug/19 20:52 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319667958 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); Review comment: Is it necessary to introduce another config key to chunk size? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304590) Time Spent: 1h (was: 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304591=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304591 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 30/Aug/19 20:52 Start Date: 30/Aug/19 20:52 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319669256 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; Review comment: For 199 - 207, Setting the query predicate has additional considerations other than the values. Check out `QueryBasedExtractor.setRangePredicates` for more details. You probably need to refactor `QueryBasedExtractor.build` method for reusability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304591) Time Spent: 1h (was: 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304593 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 30/Aug/19 20:52 Start Date: 30/Aug/19 20:52 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319667018 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ## @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws IOException, AsyncApiException } } + private List fetchWithRetry( Review comment: bulk api may have retry internally? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304593) Time Spent: 1h (was: 50m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 1h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=304592=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304592 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 30/Aug/19 20:52 Start Date: 30/Aug/19 20:52 Worklog Time Spent: 10m Work Description: zxcware commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319671525 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ## @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { +String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING"); +if (partitionType.equals("PK_CHUNKING")) { + return generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); +} else { + return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); +} + } + + /** + * generate workUnit with noQuery=true + */ + private List generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List batchIdAndResultIds = executeQueryWithPkChunking(state, previousWatermark); + List ret = createWorkUnits(sourceEntity, state, batchIdAndResultIds); + return ret; + } + + private List executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { +Properties commonProperties = sourceState.getCommonProperties(); +Properties specProperties = sourceState.getSpecProperties(); +State state = new State(); +state.setProps(commonProperties, specProperties); +WorkUnit workUnit = WorkUnit.createEmpty(); +try { + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("test" + new Random().nextInt()); + workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor enable pk chunking + int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE); + workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor pk chunking size + workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use count check for we couldn't get count + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { +throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + String field = sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + if (startDate != null && field != null) { +String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); +condition = field + " >= " + lowWatermarkDate; + } + Predicate predicate = new Predicate(null, 0, condition, "", null); + List predicateList = Arrays.asList(predicate); + List ids = salesforceExtractor.getQueryResultIds(entity, predicateList); + return ids; +} catch (Exception e) { + throw new RuntimeException(e); +} + } + + private List createWorkUnits( + SourceEntity sourceEntity, + SourceState state, + List batchResultIds + ) { +String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY); +Extract.TableType tableType = Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase()); +String outputTableName = sourceEntity.getDestTableName(); +Extract extract = createExtract(tableType, nameSpaceName, outputTableName); + +List workUnits = Lists.newArrayList(); +int partitionNumber = state.getPropAsInt(SOURCE_SOURCE_MAX_NUMBER_OF_PARTITIONS, 1); +int maxPartition = (batchResultIds.size() + partitionNumber - 1)/partitionNumber; +List> partitionedResultIds = Lists.partition(batchResultIds, maxPartition); +String bulkJobId = batchResultIds.get(0).getBulkJobId(); Review comment: We could define some object that includes the jobId and BatchIdAndResultId, given that it's the same for all batches of a job, which helps reduces duplicate data. for example: ``` SalesforceBulkJob { String
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=303920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303920 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 29/Aug/19 21:12 Start Date: 29/Aug/19 21:12 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319273784 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceWorkUnit.java ## @@ -0,0 +1,15 @@ +package org.apache.gobblin.salesforce; + +import java.util.Iterator; +import java.util.List; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; + + +public class SalesforceWorkUnit extends WorkUnit { Review comment: Thanks! Hung. Will remove this class. Will also do test with `launcher.type= MAPREDUCE`. I only tested `type=LOCAL`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 303920) Time Spent: 50m (was: 40m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 50m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=303892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303892 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 29/Aug/19 20:29 Start Date: 29/Aug/19 20:29 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319257400 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceWorkUnit.java ## @@ -0,0 +1,15 @@ +package org.apache.gobblin.salesforce; + +import java.util.Iterator; +import java.util.List; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; + + +public class SalesforceWorkUnit extends WorkUnit { Review comment: This class won't serialize all the state. Instead of adding this class, you should add a config key for the batch and result id list. Something like "salesforceSource.batchAndResultList" with a value like "xxx1:zzz1,xxx2:zzz2". Also, will you retry fetching from this state if the result is still available in the next run after a task failure? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 303892) Time Spent: 40m (was: 0.5h) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 40m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=303891=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303891 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 29/Aug/19 20:28 Start Date: 29/Aug/19 20:28 Worklog Time Spent: 10m Work Description: htran1 commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319257400 ## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceWorkUnit.java ## @@ -0,0 +1,15 @@ +package org.apache.gobblin.salesforce; + +import java.util.Iterator; +import java.util.List; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; + + +public class SalesforceWorkUnit extends WorkUnit { Review comment: This class won't serialize all the state. Instead of adding this class, you should add a config key for the batch and result id list. Something like "salesforceSource.batchAndResultList" with a value like "xxx1:zzz1,xxx2:zzz2". Also, will you retry fetching from this state if they result is still available? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 303891) Time Spent: 0.5h (was: 20m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 0.5h > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=303826=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303826 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 29/Aug/19 18:08 Start Date: 29/Aug/19 18:08 Worklog Time Spent: 10m Work Description: mvachhani commented on issue #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#issuecomment-526299219 + @htran1 for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 303826) Time Spent: 20m (was: 10m) > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition
[ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId=303317=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303317 ] ASF GitHub Bot logged work on GOBBLIN-865: -- Author: ASF GitHub Bot Created on: 29/Aug/19 01:26 Start Date: 29/Aug/19 01:26 Worklog Time Spent: 10m Work Description: arekusuri commented on pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722 Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR" - https://issues.apache.org/jira/browse/GOBBLIN-XXX ### Description - [ ] Here are some details about my PR, including screenshots (if applicable): ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 303317) Remaining Estimate: 0h Time Spent: 10m > Add feature that enables PK-chunking in partition > -- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task >Reporter: Alex Li >Priority: Major > Labels: salesforce > Time Spent: 10m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a > giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate histogram and split by row numbers) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time to > fetch full data. > We may want to utilize PK-chunking to partition the query. > > The pk-chunking doc from SFDC - > [https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)