[
https://issues.apache.org/jira/browse/GOBBLIN-1584?focusedWorklogId=689728&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-689728
]
ASF GitHub Bot logged work on GOBBLIN-1584:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 03/Dec/21 02:29
Start Date: 03/Dec/21 02:29
Worklog Time Spent: 10m
Work Description: phet commented on a change in pull request #3438:
URL: https://github.com/apache/gobblin/pull/3438#discussion_r761592819
##########
File path:
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -216,6 +216,12 @@
public static final String QUEUED_TASK_TIME_MAX_AGE =
"taskexecutor.queued_task_time.history.max_age";
public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE =
TimeUnit.HOURS.toMillis(1);
+ /**
+ * Optional property to specify for Mysql ingestion whether insertions
should do replace logic for records
+ * with the same PRIMARY KEY or unique identifier.
+ */
+ public static final String REPLACE_FOR_MYSQL_INGESTION =
"replace.allowed.for.mysql.ingestion";
Review comment:
it could be argued both ways, but I'd lean toward a universal,
non-db-specific config key... even if we only support/implement on mysql for
now. mysql makes it trivially easy to implement, but we could presumably
support other engines using different syntax or even just more complex handling
on our part. until that time though, I suggest to fail-fast when instantiating
a different type (e.g. of inserter or insert initializer) when such config is
requested.
overall, a the universal key leaves open that future where we support other
DBs. naming wise, 'update' or 'allow.overwrite' may fit.
##########
File path:
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
##########
@@ -51,6 +51,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(BaseJdbcBufferedInserter.class);
protected static final String INSERT_STATEMENT_PREFIX_FORMAT = "INSERT INTO
%s.%s (%s) VALUES ";
+ protected static final String REPLACE_STATEMENT_PREFIX_FORMAT = "REPLACE
INTO %s.%s (%s) VALUES ";
Review comment:
`REPLACE` is non-standard-SQL, specific to Mysql, so it shouldn't live
in the base. other DBs may support a `MERGE` statement... or have their own
extensions.
##########
File path:
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java
##########
@@ -47,14 +48,17 @@
private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT =
"SELECT column_name, column_type FROM information_schema.columns WHERE
table_schema = ? AND table_name = ?";
private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO
%s.%s SELECT * FROM %s.%s";
+ private static final String COPY_REPLACE_STATEMENT_FORMAT = "REPLACE INTO
%s.%s SELECT * FROM %s.%s";
private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s";
private final JdbcBufferedInserter jdbcBufferedWriter;
private final Connection conn;
+ private final boolean replaceExistingValues;
public MySqlWriterCommands(State state, Connection conn) {
this.conn = conn;
this.jdbcBufferedWriter = new MySqlBufferedInserter(state, conn);
+ this.replaceExistingValues =
state.getPropAsBoolean(ConfigurationKeys.REPLACE_FOR_MYSQL_INGESTION);
Review comment:
I hold a bias toward ctors taking typed params, rather than sifting
through config for special keys. up to you, but I'd read config earlier on the
call stack and translate to a `boolean` param, ultimately used here.
##########
File path:
gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcPublisherTest.java
##########
@@ -87,6 +87,22 @@ public void testPublish() throws IOException, SQLException {
inOrder.verify(workUnitState,
times(1)).setWorkingState(WorkUnitState.WorkingState.COMMITTED);
inOrder.verify(conn, times(1)).commit();
inOrder.verify(conn, times(1)).close();
+ }
+
+ @Test(dependsOnMethods = { "testPublish" })
Review comment:
I can't confirm that it would actually exercise any of the
mysql-specific code you modified... does it?
I didn't see you update the `JdbcPublisher` class (above), so perhaps the
better unit test to augment would be `MySqlBufferedInserterTest`?
##########
File path:
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -216,6 +216,12 @@
public static final String QUEUED_TASK_TIME_MAX_AGE =
"taskexecutor.queued_task_time.history.max_age";
public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE =
TimeUnit.HOURS.toMillis(1);
+ /**
+ * Optional property to specify for Mysql ingestion whether insertions
should do replace logic for records
+ * with the same PRIMARY KEY or unique identifier.
+ */
+ public static final String REPLACE_FOR_MYSQL_INGESTION =
"replace.allowed.for.mysql.ingestion";
Review comment:
also, be sure to note in the documentation that replacement granularity
is 'record-level', meaning that if there are duplicates in the input (by
primary key) they'd even replace one another. hence which record ultimately
gets loaded is undefined (since they may be in separate WUs, which are not
processed in a specific order).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 689728)
Time Spent: 1h 20m (was: 1h 10m)
> Add Replace Logic To Mysql Writer
> ---------------------------------
>
> Key: GOBBLIN-1584
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1584
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> We only supportĀ {{insertion}} of new values into tables, not
> {{{}update/upsert{}}}. If you run an ingestion job with a record containing a
> record with the same primary key the whole job will fail because of the
> duplicate entry. We don't expect ingestion jobs to always containĀ _only new_
> records, so we should handle duplicate entries, ingestion of data already in
> the table or updates to old values. The insert vs. replace logic should be
> configurable to the user as well.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)