zhipeng93 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904510188
##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
return new Table[] {outputTable};
}
-private static class AssemblerFunc implements FlatMapFunction {
+private static class AssemblerFunc extends RichFlatMapFunction {
private final String[] inputCols;
private final String handleInvalid;
+/** The indices for assembling vectors. */
+private transient IntArrayList indices;
+/** The values for assembling vectors. */
+private transient DoubleArrayList values;
+
public AssemblerFunc(String[] inputCols, String handleInvalid) {
this.inputCols = inputCols;
this.handleInvalid = handleInvalid;
}
@Override
-public void flatMap(Row value, Collector out) throws Exception {
+public void open(Configuration parameters) throws Exception {
+super.open(parameters);
+indices = new IntArrayList();
+values = new DoubleArrayList();
+}
+
+@Override
+public void flatMap(Row value, Collector out) {
+int offset = 0;
try {
-Object[] objects = new Object[inputCols.length];
-for (int i = 0; i < objects.length; ++i) {
-objects[i] = value.getField(inputCols[i]);
+for (String inputCol : inputCols) {
+Object object = value.getField(inputCol);
+Preconditions.checkNotNull(object, "Input column value
should not be null.");
+if (object instanceof Number) {
+indices.add(offset++);
+values.add(((Number) object).doubleValue());
+} else if (object instanceof SparseVector) {
+SparseVector sparseVector = (SparseVector) object;
+for (int i = 0; i < sparseVector.indices.length; ++i) {
+indices.add(sparseVector.indices[i] + offset);
+values.add(sparseVector.values[i]);
+}
+offset += sparseVector.size();
+} else if (object instanceof DenseVector) {
+DenseVector denseVector = (DenseVector) object;
+for (int i = 0; i < denseVector.size(); ++i) {
+indices.add(offset + i);
+values.add(denseVector.values[i]);
+}
+offset += denseVector.size();
+} else {
+throw new IllegalArgumentException(
+"Input type has not been supported yet.");
+}
+}
+
+Vector assembledVec =
+new SparseVector(
Review Comment:
Marked resolved since I removed the use of `it.unimi.dsi.fastutil.*`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org