This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new b37a4d80a0 Synchronize After Merge - specify connection through
variables #3839 (#3937)
b37a4d80a0 is described below
commit b37a4d80a08ec8f6b8a9b57bec25ef0bd0eeb043
Author: Nicolas Adment <[email protected]>
AuthorDate: Thu May 16 22:10:35 2024 +0200
Synchronize After Merge - specify connection through variables #3839 (#3937)
* Synchronize After Merge - specify connection through variables #3839
* Add variable to the test, #3839
---------
Co-authored-by: Hans Van Akelyen <[email protected]>
---
.../database/0013-synchronize-after-merge.hpl | 152 +++++++++------------
.../transforms/insertupdate/InsertUpdateMeta.java | 5 +-
.../SynchronizeAfterMerge.java | 62 ++++-----
.../SynchronizeAfterMergeDialog.java | 39 +++---
.../SynchronizeAfterMergeMeta.java | 82 ++++-------
.../SynchronizeAfterMergeMetaInjectionTest.java | 4 +-
.../SynchronizeAfterMergeMetaTest.java | 12 +-
.../SynchronizeAfterMergeTest.java | 2 +-
8 files changed, 154 insertions(+), 204 deletions(-)
diff --git a/integration-tests/database/0013-synchronize-after-merge.hpl
b/integration-tests/database/0013-synchronize-after-merge.hpl
index df7f6beb6e..2f1306884b 100644
--- a/integration-tests/database/0013-synchronize-after-merge.hpl
+++ b/integration-tests/database/0013-synchronize-after-merge.hpl
@@ -35,8 +35,6 @@ limitations under the License.
<created_date>2021/04/30 11:01:28.333</created_date>
<modified_user>-</modified_user>
<modified_date>2021/04/30 11:01:28.333</modified_date>
- <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key>
- <is_key_private>N</is_key_private>
</info>
<notepads>
</notepads>
@@ -57,69 +55,6 @@ limitations under the License.
<enabled>Y</enabled>
</hop>
</order>
- <transform>
- <name>source</name>
- <type>DataGrid</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <fields>
- <field>
- <name>key</name>
- <type>String</type>
- <format/>
- <currency/>
- <decimal/>
- <group/>
- <length>-1</length>
- <precision>-1</precision>
- <set_empty_string>N</set_empty_string>
- </field>
- <field>
- <name>value</name>
- <type>String</type>
- <format/>
- <currency/>
- <decimal/>
- <group/>
- <length>-1</length>
- <precision>-1</precision>
- <set_empty_string>N</set_empty_string>
- </field>
- </fields>
- <data>
- <line>
- <item>10</item>
- <item>aa</item>
- </line>
- <line>
- <item>20</item>
- <item>bb</item>
- </line>
- <line>
- <item>30</item>
- <item>cc</item>
- </line>
- <line>
- <item>40</item>
- <item>dd</item>
- </line>
- <line>
- <item>50</item>
- <item>ee</item>
- </line>
- </data>
- <attributes/>
- <GUI>
- <xloc>144</xloc>
- <yloc>96</yloc>
- </GUI>
- </transform>
<transform>
<name>Merge rows (diff)</name>
<type>MergeRows</type>
@@ -159,7 +94,7 @@ limitations under the License.
<method>none</method>
<schema_name/>
</partitioning>
- <connection>unit-test-db</connection>
+ <connection>${DATABASE_NAME}</connection>
<commit>100</commit>
<tablename_in_field>N</tablename_in_field>
<tablename_field/>
@@ -206,35 +141,70 @@ limitations under the License.
<method>none</method>
<schema_name/>
</partitioning>
+ <data>
+ <line>
+ <item>10</item>
+ <item>aa</item>
+ </line>
+ <line>
+ <item>30</item>
+ <item>cc</item>
+ </line>
+ <line>
+ <item>40</item>
+ <item>dd</item>
+ </line>
+ <line>
+ <item>50</item>
+ <item>e</item>
+ </line>
+ <line>
+ <item>60</item>
+ <item>ff</item>
+ </line>
+ </data>
<fields>
<field>
- <name>key</name>
- <type>String</type>
- <format/>
- <currency/>
- <decimal/>
- <group/>
<length>-1</length>
<precision>-1</precision>
<set_empty_string>N</set_empty_string>
+ <name>key</name>
+ <type>String</type>
</field>
<field>
- <name>value</name>
- <type>String</type>
- <format/>
- <currency/>
- <decimal/>
- <group/>
<length>-1</length>
<precision>-1</precision>
<set_empty_string>N</set_empty_string>
+ <name>value</name>
+ <type>String</type>
</field>
</fields>
+ <attributes/>
+ <GUI>
+ <xloc>336</xloc>
+ <yloc>224</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>source</name>
+ <type>DataGrid</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
<data>
<line>
<item>10</item>
<item>aa</item>
</line>
+ <line>
+ <item>20</item>
+ <item>bb</item>
+ </line>
<line>
<item>30</item>
<item>cc</item>
@@ -245,17 +215,29 @@ limitations under the License.
</line>
<line>
<item>50</item>
- <item>e</item>
- </line>
- <line>
- <item>60</item>
- <item>ff</item>
+ <item>ee</item>
</line>
</data>
+ <fields>
+ <field>
+ <length>-1</length>
+ <precision>-1</precision>
+ <set_empty_string>N</set_empty_string>
+ <name>key</name>
+ <type>String</type>
+ </field>
+ <field>
+ <length>-1</length>
+ <precision>-1</precision>
+ <set_empty_string>N</set_empty_string>
+ <name>value</name>
+ <type>String</type>
+ </field>
+ </fields>
<attributes/>
<GUI>
- <xloc>336</xloc>
- <yloc>224</yloc>
+ <xloc>144</xloc>
+ <yloc>96</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git
a/plugins/transforms/insertupdate/src/main/java/org/apache/hop/pipeline/transforms/insertupdate/InsertUpdateMeta.java
b/plugins/transforms/insertupdate/src/main/java/org/apache/hop/pipeline/transforms/insertupdate/InsertUpdateMeta.java
index ce6cd89110..4cd4c2bbc7 100644
---
a/plugins/transforms/insertupdate/src/main/java/org/apache/hop/pipeline/transforms/insertupdate/InsertUpdateMeta.java
+++
b/plugins/transforms/insertupdate/src/main/java/org/apache/hop/pipeline/transforms/insertupdate/InsertUpdateMeta.java
@@ -628,8 +628,7 @@ public class InsertUpdateMeta extends
BaseTransformMeta<InsertUpdate, InsertUpda
getParentTransformMeta().getParentPipelineMeta().findDatabase(connection,
variables);
if (databaseMeta != null) {
- Database db = new Database(loggingObject, variables, databaseMeta);
- try {
+ try (Database db = new Database(loggingObject, variables, databaseMeta))
{
db.connect();
if (!Utils.isEmpty(realTableName)) {
@@ -647,8 +646,6 @@ public class InsertUpdateMeta extends
BaseTransformMeta<InsertUpdate, InsertUpda
} catch (Exception e) {
throw new HopException(
BaseMessages.getString(PKG,
"InsertUpdateMeta.Exception.ErrorGettingFields"), e);
- } finally {
- db.disconnect();
}
} else {
throw new HopException(
diff --git
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java
index 2850b15105..52a10b4a60 100644
---
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java
+++
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMerge.java
@@ -81,7 +81,7 @@ public class SynchronizeAfterMerge
meta.getOperationOrderField()));
}
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
// get dynamic table name
data.realTableName = data.inputRowMeta.getString(row,
data.indexOfTableNameField);
if (Utils.isEmpty(data.realTableName)) {
@@ -112,7 +112,7 @@ public class SynchronizeAfterMerge
insertRowData[i] = row[data.valuenrs[i]];
}
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
data.insertStatement =
data.preparedStatements.get(data.realSchemaTable + "insert");
if (data.insertStatement == null) {
String sql =
@@ -165,7 +165,7 @@ public class SynchronizeAfterMerge
// LOOKUP
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
// Prepare Lookup statement
data.lookupStatement =
data.preparedStatements.get(data.realSchemaTable + "lookup");
if (data.lookupStatement == null) {
@@ -241,7 +241,7 @@ public class SynchronizeAfterMerge
if (!meta.isPerformLookup() || updateorDelete) {
// UPDATE :
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
data.updateStatement =
data.preparedStatements.get(data.realSchemaTable + "update");
if (data.updateStatement == null) {
String sql = getUpdateStatement(data.inputRowMeta);
@@ -297,7 +297,7 @@ public class SynchronizeAfterMerge
} else if (operation.equals(data.deleteValue)) {
// DELETE
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
data.deleteStatement =
data.preparedStatements.get(data.realSchemaTable + "delete");
if (data.deleteStatement == null) {
@@ -574,15 +574,13 @@ public class SynchronizeAfterMerge
data.lookupParameterRowMeta = new RowMeta();
data.lookupReturnRowMeta = new RowMeta();
- DatabaseMeta databaseMeta = meta.getDatabaseMeta();
-
String sql = "SELECT ";
for (int i = 0; i < meta.getUpdateLookup().length; i++) {
if (i != 0) {
sql += ", ";
}
- sql += databaseMeta.quoteField(meta.getUpdateLookup()[i]);
+ sql += data.databaseMeta.quoteField(meta.getUpdateLookup()[i]);
data.lookupReturnRowMeta.addValueMeta(
rowMeta.searchValueMeta(meta.getUpdateStream()[i]).clone());
}
@@ -593,7 +591,7 @@ public class SynchronizeAfterMerge
if (i != 0) {
sql += " AND ";
}
- sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
+ sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.lookupParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
@@ -613,7 +611,6 @@ public class SynchronizeAfterMerge
// Lookup certain fields in a table
public String getUpdateStatement(IRowMeta rowMeta) throws
HopDatabaseException {
- DatabaseMeta databaseMeta = meta.getDatabaseMeta();
data.updateParameterRowMeta = new RowMeta();
String sql = "UPDATE " + data.realSchemaTable + Const.CR;
@@ -629,7 +626,7 @@ public class SynchronizeAfterMerge
comma = true;
}
- sql += databaseMeta.quoteField(meta.getUpdateLookup()[i]);
+ sql += data.databaseMeta.quoteField(meta.getUpdateLookup()[i]);
sql += " = ?" + Const.CR;
data.updateParameterRowMeta.addValueMeta(
rowMeta.searchValueMeta(meta.getUpdateStream()[i]).clone());
@@ -642,7 +639,7 @@ public class SynchronizeAfterMerge
if (i != 0) {
sql += "AND ";
}
- sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
+ sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.updateParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
@@ -660,7 +657,6 @@ public class SynchronizeAfterMerge
}
public String getDeleteStatement(IRowMeta rowMeta) throws
HopDatabaseException {
- DatabaseMeta databaseMeta = meta.getDatabaseMeta();
data.deleteParameterRowMeta = new RowMeta();
String sql = "DELETE FROM " + data.realSchemaTable + Const.CR;
@@ -671,7 +667,7 @@ public class SynchronizeAfterMerge
if (i != 0) {
sql += "AND ";
}
- sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
+ sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.deleteParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
@@ -702,14 +698,14 @@ public class SynchronizeAfterMerge
data.inputRowMeta = data.outputRowMeta;
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this,
metadataProvider);
- if (meta.istablenameInField()) {
+ if (meta.isTableNameInField()) {
// ICache the position of the table name field
if (data.indexOfTableNameField < 0) {
- data.indexOfTableNameField =
data.inputRowMeta.indexOfValue(meta.gettablenameField());
+ data.indexOfTableNameField =
data.inputRowMeta.indexOfValue(meta.getTableNameField());
if (data.indexOfTableNameField < 0) {
String message =
"It was not possible to find table ["
- + meta.gettablenameField()
+ + meta.getTableNameField()
+ "] in the input fields.";
logError(message);
throw new HopTransformException(message);
@@ -823,7 +819,7 @@ public class SynchronizeAfterMerge
}
}
- if (!meta.istablenameInField()) {
+ if (!meta.isTableNameInField()) {
// Prepare Lookup statement
if (meta.isPerformLookup()) {
data.lookupStatement =
data.preparedStatements.get(data.realSchemaTable + "lookup");
@@ -906,21 +902,28 @@ public class SynchronizeAfterMerge
public boolean init() {
if (super.init()) {
try {
+
+ DatabaseMeta databaseMeta =
getPipelineMeta().findDatabase(meta.getConnection(), variables);
+ if (databaseMeta == null) {
+ logError(
+ BaseMessages.getString(
+ PKG, "SynchronizeAfterMerge.Init.ConnectionMissing",
getTransformName()));
+ return false;
+ }
+
meta.normalizeAllocationFields();
data.realSchemaName = resolve(meta.getSchemaName());
- if (meta.istablenameInField()) {
- if (Utils.isEmpty(meta.gettablenameField())) {
+ if (meta.isTableNameInField()) {
+ if (Utils.isEmpty(meta.getTableNameField())) {
logError(
BaseMessages.getString(PKG,
"SynchronizeAfterMerge.Log.Error.TableFieldnameEmpty"));
return false;
}
}
- data.databaseMeta = meta.getDatabaseMeta();
-
// if we are using Oracle then set releaseSavepoint to false
// TODO: change when we remove those variants of IDatabase
- if (data.databaseMeta.getIDatabase().isOracleVariant()) {
+ if (databaseMeta.getIDatabase().isOracleVariant()) {
data.releaseSavepoint = false;
}
@@ -932,9 +935,9 @@ public class SynchronizeAfterMerge
//
data.specialErrorHandling =
getTransformMeta().isDoingErrorHandling()
- &&
meta.getDatabaseMeta().supportsErrorHandlingOnBatchUpdates();
+ && databaseMeta.supportsErrorHandlingOnBatchUpdates();
- data.supportsSavepoints =
meta.getDatabaseMeta().getIDatabase().isUseSafePoints();
+ data.supportsSavepoints =
databaseMeta.getIDatabase().isUseSafePoints();
if (data.batchMode && data.specialErrorHandling) {
data.batchMode = false;
@@ -943,13 +946,8 @@ public class SynchronizeAfterMerge
}
}
- if (meta.getDatabaseMeta() == null) {
- logError(
- BaseMessages.getString(
- PKG, "SynchronizeAfterMerge.Init.ConnectionMissing",
getTransformName()));
- return false;
- }
- data.db = new Database(this, this, meta.getDatabaseMeta());
+ data.databaseMeta = databaseMeta;
+ data.db = new Database(this, this, databaseMeta);
data.db.connect();
data.db.setCommit(data.commitSize);
diff --git
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeDialog.java
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeDialog.java
index 078035e3d4..8ea45cca2d 100644
---
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeDialog.java
+++
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeDialog.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.synchronizeaftermerge;
+import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
@@ -168,7 +169,7 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
shell.setText(BaseMessages.getString(PKG,
"SynchronizeAfterMergeDialog.Shell.Title"));
int middle = props.getMiddlePct();
- int margin = props.getMargin();
+ int margin = PropsUi.getMargin();
// THE BUTTONS go at the bottom
wOk = new Button(shell, SWT.PUSH);
@@ -223,7 +224,7 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
wGeneralComp.setLayout(generalLayout);
// Connection line
- wConnection = addConnectionLine(wGeneralComp, wTransformName,
input.getDatabaseMeta(), lsMod);
+ wConnection = addConnectionLine(wGeneralComp, wTransformName,
input.getConnection(), lsMod);
wConnection.addSelectionListener(lsSelection);
// Schema line...
@@ -788,7 +789,7 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
}
// refresh data
- input.setDatabaseMeta(pipelineMeta.findDatabase(wConnection.getText(),
variables));
+ input.setConnection(wConnection.getText());
input.setTableName(variables.resolve(wTable.getText()));
ITransformMeta transformMetaInterface = transformMeta.getTransform();
try {
@@ -1011,9 +1012,9 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
}
wCommit.setText(input.getCommitSize());
- wTablenameInField.setSelection(input.istablenameInField());
- if (input.gettablenameField() != null) {
- wTableField.setText(input.gettablenameField());
+ wTablenameInField.setSelection(input.isTableNameInField());
+ if (input.getTableNameField() != null) {
+ wTableField.setText(input.getTableNameField());
}
wBatch.setSelection(input.useBatchUpdate());
if (input.getOperationOrderField() != null) {
@@ -1071,8 +1072,8 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
if (input.getTableName() != null) {
wTable.setText(input.getTableName());
}
- if (input.getDatabaseMeta() != null) {
- wConnection.setText(input.getDatabaseMeta().getName());
+ if (input.getConnection() != null) {
+ wConnection.setText(input.getConnection());
}
wKey.setRowNums();
@@ -1097,8 +1098,8 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
inf.allocate(nrkeys, nrFields);
inf.setCommitSize(wCommit.getText());
- inf.settablenameInField(wTablenameInField.getSelection());
- inf.settablenameField(wTableField.getText());
+ inf.setTableNameInField(wTablenameInField.getSelection());
+ inf.setTableNameField(wTableField.getText());
inf.setUseBatchUpdate(wBatch.getSelection());
inf.setPerformLookup(wPerformLookup.getSelection());
@@ -1132,9 +1133,9 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
inf.getUpdate()[i] = "Y".equals(item.getText(3));
}
+ inf.setConnection(wConnection.getText());
inf.setSchemaName(wSchema.getText());
inf.setTableName(wTable.getText());
- inf.setDatabaseMeta(pipelineMeta.findDatabase(wConnection.getText(),
variables));
transformName = wTransformName.getText(); // return value
}
@@ -1147,7 +1148,7 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
// Get the information for the dialog into the input structure.
getInfo(input);
- if (input.getDatabaseMeta() == null) {
+ if (Strings.isNullOrEmpty(input.getConnection())) {
MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
mb.setMessage(
BaseMessages.getString(
@@ -1253,18 +1254,15 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
info);
IRowMeta prev = pipelineMeta.getPrevTransformFields(variables,
transformName);
+ DatabaseMeta databaseMeta =
pipelineMeta.findDatabase(wConnection.getText(), variables);
+
SqlStatement sql =
info.getSqlStatements(variables, pipelineMeta, transformMeta, prev,
metadataProvider);
if (!sql.hasError()) {
if (sql.hasSql()) {
SqlEditor sqledit =
new SqlEditor(
- shell,
- SWT.NONE,
- variables,
- info.getDatabaseMeta(),
- DbCache.getInstance(),
- sql.getSql());
+ shell, SWT.NONE, variables, databaseMeta,
DbCache.getInstance(), sql.getSql());
sqledit.open();
} else {
MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_INFORMATION);
@@ -1292,8 +1290,7 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
private void getSchemaNames() {
DatabaseMeta databaseMeta =
pipelineMeta.findDatabase(wConnection.getText(), variables);
if (databaseMeta != null) {
- Database database = new Database(loggingObject, variables, databaseMeta);
- try {
+ try (Database database = new Database(loggingObject, variables,
databaseMeta)) {
database.connect();
String[] schemas = database.getSchemas();
@@ -1329,8 +1326,6 @@ public class SynchronizeAfterMergeDialog extends
BaseTransformDialog implements
BaseMessages.getString(PKG, "System.Dialog.Error.Title"),
BaseMessages.getString(PKG,
"SynchronizeAfterMergeDialog.ErrorGettingSchemas"),
e);
- } finally {
- database.disconnect();
}
}
}
diff --git
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMeta.java
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMeta.java
index 3757fb278a..59d718d26c 100644
---
a/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMeta.java
+++
b/plugins/transforms/synchronizeaftermerge/src/main/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMeta.java
@@ -68,11 +68,6 @@ public class SynchronizeAfterMergeMeta
@Injection(name = "TABLE_NAME")
private String tableName;
- private IHopMetadataProvider metadataProvider;
-
- /** database connection */
- private DatabaseMeta databaseMeta;
-
/** which field in input stream to compare with? */
@Injection(name = "STREAM_FIELD1", group = "KEYS_TO_LOOKUP")
private String[] keyStream;
@@ -129,17 +124,19 @@ public class SynchronizeAfterMergeMeta
@Injection(name = "ORDER_DELETE")
private String orderDelete;
+ @Injection(name = "CONNECTION_NAME")
+ private String connection;
+
public SynchronizeAfterMergeMeta() {
super(); // allocate BaseTransformMeta
}
- @Injection(name = "CONNECTION_NAME")
- public void setConnection(String connectionName) {
- try {
- databaseMeta = DatabaseMeta.loadDatabase(metadataProvider,
connectionName);
- } catch (HopXmlException e) {
- throw new RuntimeException("Error load connection '" + connectionName +
"'", e);
- }
+ public String getConnection() {
+ return connection;
+ }
+
+ public void setConnection(String connection) {
+ this.connection = connection;
}
/**
@@ -170,15 +167,15 @@ public class SynchronizeAfterMergeMeta
return performLookup;
}
- public boolean istablenameInField() {
+ public boolean isTableNameInField() {
return tablenameInField;
}
- public void settablenameInField(boolean tablenamefield) {
+ public void setTableNameInField(boolean tablenamefield) {
this.tablenameInField = tablenamefield;
}
- public String gettablenameField() {
+ public String getTableNameField() {
return tablenameField;
}
@@ -214,7 +211,7 @@ public class SynchronizeAfterMergeMeta
this.operationOrderField = operationOrderField;
}
- public void settablenameField(String tablenamefield) {
+ public void setTableNameField(String tablenamefield) {
this.tablenameField = tablenamefield;
}
@@ -239,20 +236,6 @@ public class SynchronizeAfterMergeMeta
this.commitSize = commitSize;
}
- /**
- * @return Returns the database.
- */
- public DatabaseMeta getDatabaseMeta() {
- return databaseMeta;
- }
-
- /**
- * @param database The database to set.
- */
- public void setDatabaseMeta(DatabaseMeta database) {
- this.databaseMeta = database;
- }
-
/**
* @return Returns the keyCondition.
*/
@@ -409,13 +392,10 @@ public class SynchronizeAfterMergeMeta
private void readData(Node transformNode, IHopMetadataProvider
metadataProvider)
throws HopXmlException {
- this.metadataProvider = metadataProvider;
try {
int nrkeys;
int nrvalues;
- this.databases = databases;
- String con = XmlHandler.getTagValue(transformNode, "connection");
- databaseMeta = DatabaseMeta.loadDatabase(metadataProvider, con);
+ connection = XmlHandler.getTagValue(transformNode, "connection");
commitSize = XmlHandler.getTagValue(transformNode, "commit");
schemaName = XmlHandler.getTagValue(transformNode, "lookup", "schema");
tableName = XmlHandler.getTagValue(transformNode, "lookup", "table");
@@ -483,7 +463,7 @@ public class SynchronizeAfterMergeMeta
tablenameField = null;
keyStream = null;
updateLookup = null;
- databaseMeta = null;
+ connection = null;
commitSize = "100";
schemaName = "";
tableName = BaseMessages.getString(PKG,
"SynchronizeAfterMergeMeta.DefaultTableName");
@@ -519,11 +499,7 @@ public class SynchronizeAfterMergeMeta
normalizeAllocationFields();
StringBuilder retval = new StringBuilder(200);
- retval
- .append(" ")
- .append(
- XmlHandler.addTagValue(
- "connection", databaseMeta == null ? "" :
databaseMeta.getName()));
+ retval.append(" ").append(XmlHandler.addTagValue("connection",
connection));
retval.append(" ").append(XmlHandler.addTagValue("commit", commitSize));
retval.append(" ").append(XmlHandler.addTagValue("tablename_in_field",
tablenameInField));
@@ -578,9 +554,9 @@ public class SynchronizeAfterMergeMeta
CheckResult cr;
String errorMessage = "";
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connection,
variables);
if (databaseMeta != null) {
- Database db = new Database(loggingObject, variables, databaseMeta);
- try {
+ try (Database db = new Database(loggingObject, variables, databaseMeta))
{
db.connect();
if (!Utils.isEmpty(tableName)) {
@@ -859,8 +835,6 @@ public class SynchronizeAfterMergeMeta
+ e.getMessage();
cr = new CheckResult(ICheckResult.TYPE_RESULT_ERROR, errorMessage,
transformMeta);
remarks.add(cr);
- } finally {
- db.disconnect();
}
} else {
errorMessage =
@@ -897,10 +871,14 @@ public class SynchronizeAfterMergeMeta
IRowMeta prev,
IHopMetadataProvider metadataProvider)
throws HopTransformException {
+
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connection,
variables);
+
SqlStatement retval =
new SqlStatement(transformMeta.getName(), databaseMeta, null); //
default: nothing to do!
if (databaseMeta != null) {
+
if (prev != null && prev.size() > 0) {
// Copy the row
IRowMeta tableFields = new RowMeta();
@@ -937,8 +915,8 @@ public class SynchronizeAfterMergeMeta
}
if (!Utils.isEmpty(tableName)) {
- Database db = new Database(loggingObject, variables, databaseMeta);
- try {
+
+ try (Database db = new Database(loggingObject, variables,
databaseMeta)) {
db.connect();
String schemaTable =
@@ -1010,7 +988,9 @@ public class SynchronizeAfterMergeMeta
IRowMeta info,
IHopMetadataProvider metadataProvider)
throws HopTransformException {
- if (prev != null) {
+
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connection,
variables);
+ if (databaseMeta != null && prev != null) {
// Lookup: we do a lookup on the natural keys
for (int i = 0; i < keyLookup.length; i++) {
IValueMeta v = prev.searchValueMeta(keyStream[i]);
@@ -1055,10 +1035,10 @@ public class SynchronizeAfterMergeMeta
public IRowMeta getRequiredFields(IVariables variables) throws HopException {
String realTableName = variables.resolve(tableName);
String realSchemaName = variables.resolve(schemaName);
-
+ DatabaseMeta databaseMeta =
+
getParentTransformMeta().getParentPipelineMeta().findDatabase(connection,
variables);
if (databaseMeta != null) {
- Database db = new Database(loggingObject, variables, databaseMeta);
- try {
+ try (Database db = new Database(loggingObject, variables, databaseMeta))
{
db.connect();
if (!Utils.isEmpty(realTableName)) {
@@ -1077,8 +1057,6 @@ public class SynchronizeAfterMergeMeta
throw new HopException(
BaseMessages.getString(PKG,
"SynchronizeAfterMergeMeta.Exception.ErrorGettingFields"),
e);
- } finally {
- db.disconnect();
}
} else {
throw new HopException(
diff --git
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaInjectionTest.java
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaInjectionTest.java
index 4bdc6f4e7b..7d81399e03 100644
---
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaInjectionTest.java
+++
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaInjectionTest.java
@@ -125,7 +125,7 @@ public class SynchronizeAfterMergeMetaInjectionTest
new IBooleanGetter() {
@Override
public boolean get() {
- return meta.istablenameInField();
+ return meta.isTableNameInField();
}
});
check(
@@ -133,7 +133,7 @@ public class SynchronizeAfterMergeMetaInjectionTest
new IStringGetter() {
@Override
public String get() {
- return meta.gettablenameField();
+ return meta.getTableNameField();
}
});
check(
diff --git
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaTest.java
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaTest.java
index 2555a1de0b..20e2191167 100644
---
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaTest.java
+++
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeMetaTest.java
@@ -48,10 +48,10 @@ public class SynchronizeAfterMergeMetaTest implements
IInitializer<ITransform> {
Arrays.asList(
"schemaName",
"tableName",
- "databaseMeta",
+ "connection",
"commitSize",
"tableNameInField",
- "tablenameField",
+ "tableNameField",
"operationOrderField",
"useBatchUpdate",
"performLookup",
@@ -69,16 +69,16 @@ public class SynchronizeAfterMergeMetaTest implements
IInitializer<ITransform> {
Map<String, String> getterMap =
new HashMap<String, String>() {
{
- put("tableNameInField", "istablenameInField");
- put("tablenameField", "gettablenameField");
+ put("tableNameInField", "isTableNameInField");
+ put("tableNameField", "getTableNameField");
put("useBatchUpdate", "useBatchUpdate");
}
};
Map<String, String> setterMap =
new HashMap<String, String>() {
{
- put("tableNameInField", "settablenameInField");
- put("tablenameField", "settablenameField");
+ put("tableNameInField", "setTableNameInField");
+ put("tableNameField", "setTableNameField");
}
};
IFieldLoadSaveValidator<String[]> stringArrayLoadSaveValidator =
diff --git
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeTest.java
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeTest.java
index 23df739d4c..89c695774a 100644
---
a/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeTest.java
+++
b/plugins/transforms/synchronizeaftermerge/src/test/java/org/apache/hop/pipeline/transforms/synchronizeaftermerge/SynchronizeAfterMergeTest.java
@@ -48,7 +48,7 @@ public class SynchronizeAfterMergeTest {
DatabaseMeta dbMeta = mock(DatabaseMeta.class);
doReturn(mock(NoneDatabaseMeta.class)).when(dbMeta).getIDatabase();
- doReturn(dbMeta).when(smi).getDatabaseMeta();
+ // doReturn(dbMeta).when(smi).getDatabaseMeta();
doReturn("${commit.size}").when(smi).getCommitSize();
PipelineMeta pipelineMeta = mock(PipelineMeta.class);