This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b0f29ef94e NIFI-14958 Support Array of Strings for Avro Schema derived
from JDBC Results (#10295)
b0f29ef94e is described below
commit b0f29ef94e95be8160ec2cd5fbdfbef373451f90
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 16 17:01:36 2025 +0200
NIFI-14958 Support Array of Strings for Avro Schema derived from JDBC
Results (#10295)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/util/db/JdbcCommon.java | 13 ++++++-
.../processors/standard/TestExecuteSQLRecord.java | 42 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index f0cbf07668..227ed1de01 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -664,11 +664,22 @@ public class JdbcCommon {
case BINARY:
case VARBINARY:
case LONGVARBINARY:
- case ARRAY:
case BLOB:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
break;
+ case ARRAY: {
+ builder.name(columnName)
+ .type()
+ .unionOf()
+ .nullBuilder().endNull()
+ .and().bytesType()
+
.and().type(Schema.createArray(Schema.create(Schema.Type.STRING)))
+ .endUnion()
+ .noDefault();
+ break;
+ }
+
case -150: // SQLServer may return -150 from the driver even
though it's really -156 (sql_variant), treat as a union since we don't know
what the values will actually be
case -156:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().and().intType().and().longType().and().booleanType().and().bytesType().and()
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 374006173a..9b63a13537 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -20,13 +20,19 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
import org.apache.nifi.util.db.SimpleCommerceDataSet;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -35,18 +41,23 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
@@ -649,6 +660,37 @@ public class TestExecuteSQLRecord {
firstFlowFile.assertContentEquals("test");
}
+ @Test
+ public void testArrayOfStringsInference() throws Exception {
+ final ResultSetMetaData meta = mock(ResultSetMetaData.class);
+ when(meta.getColumnCount()).thenReturn(1);
+ when(meta.getColumnLabel(1)).thenReturn("test");
+ when(meta.getColumnName(1)).thenReturn("test");
+ when(meta.getColumnType(1)).thenReturn(Types.ARRAY);
+ when(meta.getTableName(1)).thenReturn("");
+
+ final ResultSet rs = mock(ResultSet.class);
+ when(rs.getMetaData()).thenReturn(meta);
+ when(rs.next()).thenReturn(true, false);
+
+ final Array array = mock(Array.class);
+ when(array.getArray()).thenReturn(new String[] {"test"});
+ when(rs.getArray(1)).thenReturn(array);
+ when(rs.getObject(1)).thenReturn(array);
+ when(rs.getObject("test")).thenReturn(array);
+
+ final TestRunner localRunner =
TestRunners.newTestRunner(ExecuteSQLRecord.class);
+ final RecordSetWriterFactory writerFactory = new JsonRecordSetWriter();
+ localRunner.addControllerService("writer", writerFactory);
+ localRunner.enableControllerService(writerFactory);
+
+ final RecordSqlWriter sqlWriter = new RecordSqlWriter(writerFactory,
AvroConversionOptions.builder().useLogicalTypes(false).build(), 0, Map.of());
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final ComponentLog log = new MockComponentLog("test", sqlWriter);
+ sqlWriter.writeResultSet(rs, out, log, null);
+ final String json = out.toString();
+ assertTrue(json.contains("\"test\":[\"test\"]"), "Expected JSON to
contain array of strings: " + json);
+ }
/**
* Simple implementation only for ExecuteSQL processor testing.