This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit b593a6068da79bdb61c48e493717a74b2675a83e Author: Salvatore Mongiardo <[email protected]> AuthorDate: Fri Mar 13 16:15:21 2026 +0100 Camel-Milvus: create rag helpers to be used as beans --- .../helpers/MilvusHelperCreateCollection.java | 195 +++++++++++++++++++++ .../milvus/helpers/MilvusHelperCreateIndex.java | 106 +++++++++++ .../milvus/helpers/MilvusHelperDelete.java | 69 ++++++++ .../helpers/MilvusHelperFieldMappingUtil.java | 51 ++++++ .../milvus/helpers/MilvusHelperInsert.java | 99 +++++++++++ .../helpers/MilvusHelperResultExtractor.java | 85 +++++++++ .../milvus/helpers/MilvusHelperSearch.java | 116 ++++++++++++ .../milvus/helpers/MilvusHelperUpsert.java | 99 +++++++++++ .../milvus/MilvusCreateCollectionTest.java | 57 ++---- .../component/milvus/it/MilvusComponentIT.java | 151 +++++++--------- 10 files changed, 900 insertions(+), 128 deletions(-) diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java new file mode 100644 index 000000000000..19f81db37008 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.grpc.DataType; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.collection.CreateCollectionParam} from simple + * string properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_COLLECTION} header. + */ +public class MilvusHelperCreateCollection implements Processor { + + private String collectionName = "default_collection"; + private String collectionDescription = "Default collection"; + private String idFieldName = "id"; + private String dimension = "768"; + private String textFieldName = "content"; + private String textFieldDataType = "VarChar"; + private String vectorFieldName = "embedding"; + private String vectorDataType = "FloatVector"; + private String textFieldMaxLength = "2048"; + private String additionalTextFields; + + @Override + public void process(Exchange exchange) throws Exception { + int vectorDim = Integer.parseInt(dimension); + int maxLength = Integer.parseInt(textFieldMaxLength); + DataType textDataType = DataType.valueOf(textFieldDataType); + + FieldType idField = FieldType.newBuilder() + .withName(idFieldName) + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(true) + .build(); + + FieldType.Builder textFieldBuilder = FieldType.newBuilder() + .withName(textFieldName) + .withDataType(textDataType); + if (textDataType == DataType.VarChar) { + textFieldBuilder.withMaxLength(maxLength); + } + FieldType textField = textFieldBuilder.build(); + + DataType vecDataType = DataType.valueOf(vectorDataType); + + FieldType vectorField = FieldType.newBuilder() + .withName(vectorFieldName) + .withDataType(vecDataType) + .withDimension(vectorDim) + .build(); + + CreateCollectionParam.Builder builder = CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDescription(collectionDescription) + .addFieldType(idField) + .addFieldType(textField); + + if (additionalTextFields != null && !additionalTextFields.isBlank()) { + for (String fieldName : additionalTextFields.split(",")) { + String trimmed = fieldName.trim(); + if (!trimmed.isEmpty()) { + FieldType extraField = FieldType.newBuilder() + .withName(trimmed) + .withDataType(DataType.VarChar) + .withMaxLength(maxLength) + .build(); + builder.addFieldType(extraField); + } + } + } + + builder.addFieldType(vectorField); + + exchange.getIn().setBody(builder.build()); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getCollectionDescription() { + return collectionDescription; + } + + public void setCollectionDescription(String collectionDescription) { + this.collectionDescription = collectionDescription; + } + + public String getIdFieldName() { + return idFieldName; + } + + public void setIdFieldName(String idFieldName) { + this.idFieldName = idFieldName; + } + + public String getDimension() { + return dimension; + } + + /** + * @param dimension the vector dimension as a string (e.g., {@code 768}, {@code 1536}) + */ + public void setDimension(String dimension) { + this.dimension = dimension; + } + + public String getTextFieldName() { + return textFieldName; + } + + public void setTextFieldName(String textFieldName) { + this.textFieldName = textFieldName; + } + + public String getTextFieldDataType() { + return textFieldDataType; + } + + /** + * @param textFieldDataType the Milvus {@link io.milvus.grpc.DataType} enum name (e.g., {@code VarChar}, + * {@code Int8}) + */ + public void setTextFieldDataType(String textFieldDataType) { + this.textFieldDataType = textFieldDataType; + } + + public String getVectorDataType() { + return vectorDataType; + } + + /** + * @param vectorDataType the Milvus {@link io.milvus.grpc.DataType} enum name for the vector field (e.g., + * {@code FloatVector}, {@code BinaryVector}, {@code Float16Vector}) + */ + public void setVectorDataType(String vectorDataType) { + this.vectorDataType = vectorDataType; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMaxLength() { + return textFieldMaxLength; + } + + /** + * @param textFieldMaxLength the maximum length for VarChar fields as a string (e.g., {@code 2048}) + */ + public void setTextFieldMaxLength(String textFieldMaxLength) { + this.textFieldMaxLength = textFieldMaxLength; + } + + public String getAdditionalTextFields() { + return additionalTextFields; + } + + /** + * @param additionalTextFields comma-separated list of extra VarChar field names (e.g., {@code title,author}) + */ + public void setAdditionalTextFields(String additionalTextFields) { + this.additionalTextFields = additionalTextFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java new file mode 100644 index 000000000000..3f749e802c19 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.index.CreateIndexParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.index.CreateIndexParam} from simple string + * properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_INDEX} header. + */ +public class MilvusHelperCreateIndex implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String indexType = "IVF_FLAT"; + private String metricType = "COSINE"; + private String extraParam = "{\"nlist\": 128}"; + + @Override + public void process(Exchange exchange) throws Exception { + IndexType idxType = IndexType.valueOf(indexType); + MetricType metric = MetricType.valueOf(metricType); + + CreateIndexParam param = CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName(vectorFieldName) + .withIndexType(idxType) + .withMetricType(metric) + .withExtraParam(extraParam) + .withSyncMode(Boolean.TRUE) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getIndexType() { + return indexType; + } + + /** + * @param indexType the Milvus {@link io.milvus.param.IndexType} enum name (e.g., {@code IVF_FLAT}, {@code HNSW}) + */ + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public String getMetricType() { + return metricType; + } + + /** + * @param metricType the Milvus {@link io.milvus.param.MetricType} enum name (e.g., {@code COSINE}, {@code L2}, + * {@code IP}) + */ + public void setMetricType(String metricType) { + this.metricType = metricType; + } + + public String getExtraParam() { + return extraParam; + } + + /** + * @param extraParam JSON string with index-specific parameters (e.g., {@code {"nlist": 128}}) + */ + public void setExtraParam(String extraParam) { + this.extraParam = extraParam; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java new file mode 100644 index 000000000000..6a37abaca3bf --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.dml.DeleteParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.DeleteParam} from simple string properties + * and sets it as the exchange body together with the {@link MilvusAction#DELETE} header. + */ +public class MilvusHelperDelete implements Processor { + + private String collectionName = "default_collection"; + private String filter; + + @Override + public void process(Exchange exchange) throws Exception { + if (filter == null || filter.isEmpty()) { + throw new IllegalArgumentException( + "A filter expression is required for delete operations (e.g., \"id in [1, 2, 3]\")"); + } + + DeleteParam param = DeleteParam.newBuilder() + .withCollectionName(collectionName) + .withExpr(filter) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.DELETE); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to select entities to delete (e.g., {@code id in [1, 2, 3]}, + * {@code age > 18}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java new file mode 100644 index 000000000000..0cadfc955844 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Shared utility for parsing {@code textFieldMappings} strings used by {@link MilvusHelperInsert} and + * {@link MilvusHelperUpsert}. + */ +final class MilvusHelperFieldMappingUtil { + + private MilvusHelperFieldMappingUtil() { + } + + /** + * Parses a comma-separated mapping string into an ordered map of field name to variable name. + * + * @param textFieldMappings the mappings string (e.g., {@code field1=var1,field2=var2}) + * @return an ordered map from field name to variable name + * @throws IllegalArgumentException if a mapping entry does not contain exactly one {@code =} separator + */ + static Map<String, String> parseMappings(String textFieldMappings) { + Map<String, String> result = new LinkedHashMap<>(); + for (String mapping : textFieldMappings.split(",")) { + String[] pair = mapping.trim().split("="); + if (pair.length != 2) { + throw new IllegalArgumentException( + "Invalid textFieldMappings entry: '" + mapping.trim() + + "'. Expected format: fieldName=variableName"); + } + result.put(pair[0].trim(), pair[1].trim()); + } + return result; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java new file mode 100644 index 000000000000..3578425dc214 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.InsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.InsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#INSERT} header. + */ +public class MilvusHelperInsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List<Float> vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List<Float> vector, but was null"); + } + + Map<String, String> mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List<InsertParam.Field> fields = new ArrayList<>(); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new InsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new InsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + InsertParam param = InsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.INSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java new file mode 100644 index 000000000000..f2831a9f3bae --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.milvus.param.highlevel.dml.response.SearchResponse; +import io.milvus.response.QueryResultsWrapper; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * A Camel {@link Processor} that extracts field values from a Milvus + * {@link io.milvus.param.highlevel.dml.response.SearchResponse} into a list of ranked maps. Each map contains a + * {@code rank} entry and the requested output field values. The extracted list is set as the exchange body. + */ +public class MilvusHelperResultExtractor implements Processor { + + private String outputFields = "content"; + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(extract(exchange)); + } + + public List<Map<String, Object>> extract(Exchange exchange) { + SearchResponse response = exchange.getIn().getBody(SearchResponse.class); + List<Map<String, Object>> extracted = new ArrayList<>(); + + String[] fields = outputFields.split(","); + + if (response != null) { + List<QueryResultsWrapper.RowRecord> records = response.getRowRecords(0); + int rank = 1; + for (QueryResultsWrapper.RowRecord record : records) { + Map<String, Object> item = new LinkedHashMap<>(); + item.put("rank", rank++); + Object score = record.get("score"); + if (score != null) { + item.put("score", score); + } + for (String field : fields) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + Object value = record.get(trimmed); + if (value != null) { + item.put(trimmed, value); + } + } + } + extracted.add(item); + } + } + return extracted; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to extract from search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java new file mode 100644 index 000000000000..e13e184f8bec --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.List; + +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.param.highlevel.dml.SearchSimpleParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.highlevel.dml.SearchSimpleParam} from the + * exchange body vector and simple string properties, then sets it as the exchange body together with the + * {@link MilvusAction#SEARCH} header. + */ +public class MilvusHelperSearch implements Processor { + + private String collectionName = "default_collection"; + private String outputFields = "content"; + private String limit = "10"; + private String filter; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List<Float> queryEmbedding = exchange.getIn().getBody(List.class); + if (queryEmbedding == null) { + throw new IllegalArgumentException("Exchange body must contain a List<Float> vector, but was null"); + } + long searchLimit = Long.parseLong(limit); + + List<String> fields = new ArrayList<>(); + for (String field : outputFields.split(",")) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + fields.add(trimmed); + } + } + + SearchSimpleParam.Builder builder = SearchSimpleParam.newBuilder() + .withCollectionName(collectionName) + .withVectors(queryEmbedding) + .withLimit(searchLimit) + .withOutputFields(fields) + .withConsistencyLevel(ConsistencyLevelEnum.STRONG); + + if (filter != null && !filter.isEmpty()) { + builder.withFilter(filter); + } + + SearchSimpleParam param = builder.build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to include in search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } + + public String getLimit() { + return limit; + } + + /** + * @param limit the maximum number of results to return as a string (e.g., {@code 10}, {@code 100}) + */ + public void setLimit(String limit) { + this.limit = limit; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to filter results (e.g., {@code age > 18}, {@code status == "active"}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java new file mode 100644 index 000000000000..df6c1d7c24c0 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.UpsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.UpsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#UPSERT} header. + */ +public class MilvusHelperUpsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List<Float> vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List<Float> vector, but was null"); + } + + Map<String, String> mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List<UpsertParam.Field> fields = new ArrayList<>(); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new UpsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new UpsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + UpsertParam param = UpsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.UPSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java index a75119addc33..1918defe59c9 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java @@ -17,12 +17,10 @@ package org.apache.camel.component.milvus; -import io.milvus.grpc.DataType; -import io.milvus.param.collection.CollectionSchemaParam; -import io.milvus.param.collection.CreateCollectionParam; -import io.milvus.param.collection.FieldType; import org.apache.camel.Exchange; import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -32,43 +30,22 @@ public class MilvusCreateCollectionTest extends MilvusTestSupport { @DisplayName("Tests that trying to create a collection without passing the action name triggers a failure") @Test - public void createCollectionWithoutRequiredParameters() { - FieldType fieldType1 = FieldType.newBuilder() - .withName("userID") - .withDescription("user identification") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(true) - .build(); - - FieldType fieldType2 = FieldType.newBuilder() - .withName("userFace") - .withDescription("face embedding") - .withDataType(DataType.FloatVector) - .withDimension(64) - .build(); - - FieldType fieldType3 = FieldType.newBuilder() - .withName("userAge") - .withDescription("user age") - .withDataType(DataType.Int8) - .build(); - - CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() - .withCollectionName("test") - .withDescription("customer info") - .withShardsNum(2) - .withSchema(CollectionSchemaParam.newBuilder() - .withEnableDynamicField(false) - .addFieldType(fieldType1) - .addFieldType(fieldType2) - .addFieldType(fieldType3) - .build()) - .build(); - + public void createCollectionWithoutRequiredParameters() throws Exception { + MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); + ragCreateCollection.setCollectionName("test"); + ragCreateCollection.setCollectionDescription("customer info"); + ragCreateCollection.setIdFieldName("userID"); + ragCreateCollection.setVectorFieldName("userFace"); + ragCreateCollection.setTextFieldName("userAge"); + ragCreateCollection.setTextFieldDataType("Int8"); + ragCreateCollection.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateCollection.process(tempExchange); + + // Send body without the action header to trigger failure Exchange result = fluentTemplate.to("milvus:createCollection") - .withBody( - createCollectionReq) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java index cab3064b0852..6d12b6f21b46 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java @@ -21,24 +21,22 @@ import java.util.List; import java.util.Random; import io.milvus.common.clientenum.ConsistencyLevelEnum; -import io.milvus.grpc.DataType; import io.milvus.grpc.QueryResults; import io.milvus.param.IndexType; -import io.milvus.param.MetricType; -import io.milvus.param.collection.CollectionSchemaParam; -import io.milvus.param.collection.CreateCollectionParam; -import io.milvus.param.collection.FieldType; -import io.milvus.param.dml.DeleteParam; import io.milvus.param.dml.InsertParam; import io.milvus.param.dml.QueryParam; import io.milvus.param.dml.UpsertParam; -import io.milvus.param.highlevel.dml.SearchSimpleParam; import io.milvus.param.highlevel.dml.response.SearchResponse; import io.milvus.param.index.CreateIndexParam; import org.apache.camel.Exchange; import org.apache.camel.component.milvus.MilvusAction; import org.apache.camel.component.milvus.MilvusHeaders; import org.apache.camel.component.milvus.MilvusTestSupport; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateIndex; +import org.apache.camel.component.milvus.helpers.MilvusHelperDelete; +import org.apache.camel.component.milvus.helpers.MilvusHelperSearch; +import org.apache.camel.support.DefaultExchange; import org.assertj.core.util.Lists; import org.junit.jupiter.api.*; @@ -48,43 +46,22 @@ import static org.assertj.core.api.Assertions.assertThat; public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(1) - public void createCollection() { - FieldType fieldType1 = FieldType.newBuilder() - .withName("userID") - .withDescription("user identification") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(true) - .build(); - - FieldType fieldType2 = FieldType.newBuilder() - .withName("userFace") - .withDescription("face embedding") - .withDataType(DataType.FloatVector) - .withDimension(64) - .build(); - - FieldType fieldType3 = FieldType.newBuilder() - .withName("userAge") - .withDescription("user age") - .withDataType(DataType.Int8) - .build(); - - CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() - .withCollectionName("test") - .withDescription("customer info") - .withShardsNum(2) - .withSchema(CollectionSchemaParam.newBuilder() - .withEnableDynamicField(false) - .addFieldType(fieldType1) - .addFieldType(fieldType2) - .addFieldType(fieldType3).build()) - .build(); + public void createCollection() throws Exception { + MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); + ragCreateCollection.setCollectionName("test"); + ragCreateCollection.setCollectionDescription("customer info"); + ragCreateCollection.setIdFieldName("userID"); + ragCreateCollection.setVectorFieldName("userFace"); + ragCreateCollection.setTextFieldName("userAge"); + ragCreateCollection.setTextFieldDataType("Int8"); + ragCreateCollection.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateCollection.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION) - .withBody( - createCollectionReq) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -93,7 +70,7 @@ public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(2) - public void createIndex() { + public void createIndex() throws Exception { CreateIndexParam createAgeIndexParam = CreateIndexParam.newBuilder() .withCollectionName("test") .withFieldName("userAge") @@ -110,20 +87,19 @@ public class MilvusComponentIT extends MilvusTestSupport { assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - CreateIndexParam createVectorIndexParam = CreateIndexParam.newBuilder() - .withCollectionName("test") - .withFieldName("userFace") - .withIndexName("userFaceIndex") - .withIndexType(IndexType.IVF_FLAT) - .withMetricType(MetricType.L2) - .withExtraParam("{\"nlist\":128}") - .withSyncMode(Boolean.TRUE) - .build(); + MilvusHelperCreateIndex ragCreateIndex = new MilvusHelperCreateIndex(); + ragCreateIndex.setCollectionName("test"); + ragCreateIndex.setVectorFieldName("userFace"); + ragCreateIndex.setIndexType("IVF_FLAT"); + ragCreateIndex.setMetricType("L2"); + ragCreateIndex.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateIndex.process(tempExchange); result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX) - .withBody( - createVectorIndexParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -187,21 +163,20 @@ public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(5) - public void search() { - SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() - .withCollectionName("test") - .withVectors(generateFloatVector()) - .withFilter("userAge>0") - .withLimit(100L) - .withOffset(0L) - .withOutputFields(Lists.newArrayList("userAge")) - .withConsistencyLevel(ConsistencyLevelEnum.STRONG) - .build(); + public void search() throws Exception { + MilvusHelperSearch ragSearch = new MilvusHelperSearch(); + ragSearch.setCollectionName("test"); + ragSearch.setOutputFields("userAge"); + ragSearch.setFilter("userAge>0"); + ragSearch.setLimit("100"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.getIn().setBody(generateFloatVector()); + ragSearch.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) - .withBody( - searchSimpleParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -232,35 +207,35 @@ public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(7) - public void delete() { - DeleteParam delete = DeleteParam.newBuilder() - .withCollectionName("test") - .withExpr("userAge>0") - .build(); + public void delete() throws Exception { + MilvusHelperDelete ragDelete = new MilvusHelperDelete(); + ragDelete.setCollectionName("test"); + ragDelete.setFilter("userAge>0"); + + Exchange tempExchange = new DefaultExchange(context); + ragDelete.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.DELETE) - .withBody( - delete) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() - .withCollectionName("test") - .withVectors(generateFloatVector()) - .withFilter("userAge>0") - .withLimit(100L) - .withOffset(0L) - .withOutputFields(Lists.newArrayList("userAge")) - .withConsistencyLevel(ConsistencyLevelEnum.STRONG) - .build(); + MilvusHelperSearch ragSearch = new MilvusHelperSearch(); + ragSearch.setCollectionName("test"); + ragSearch.setOutputFields("userAge"); + ragSearch.setFilter("userAge>0"); + ragSearch.setLimit("100"); + + tempExchange = new DefaultExchange(context); + tempExchange.getIn().setBody(generateFloatVector()); + ragSearch.process(tempExchange); result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) - .withBody( - searchSimpleParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull();
