mcvsubbu commented on a change in pull request #5597:
URL: https://github.com/apache/incubator-pinot/pull/5597#discussion_r443152690



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Utility methods for extracting source and destination fields from ingestion 
configs
+ */
+public class IngestionUtils {
+
+  /**
+   * Extracts all fields required by the {@link 
org.apache.pinot.spi.data.readers.RecordExtractor} from the given TableConfig 
and Schema
+   */
+  public static Set<String> getFieldsForRecordExtractor(TableConfig 
tableConfig, Schema schema) {

Review comment:
       Consider making two methods below public, for readability.
   
   getFieldsToFilterOn()
   getFieldsToExtract()
   
   Or, keep one in the schema and the other here, but that may lead callers to 
ignore this one.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
##########
@@ -245,7 +245,8 @@ public void run() {
             if (consumedRow != null) {
               try {
                 GenericRow transformedRow = 
_recordTransformer.transform(consumedRow);
-                if (transformedRow != null) {
+                // FIXME: MULTIPLE_RECORDS_KEY is not handled here

Review comment:
       On this note, can you add some comments on GenericRow as to how 
MULTIPLE_RECORDS_KEY is used for (in GenericRow class) and also document it in 
the decoders description in the documentation? It seems to be used only in test 
files now, and there is no doc on what it means. Thanks.
   
   It looks like the MULTIPLE stuff is a decoder feature. So, when you add a 
FIXME like this, does this mean that the decoders using multiple recorsd key 
will break? Or, will they simply not be able to use filters? When is this 
planned to be fixed? Is this just an interim state from which we should take 
care not to cut a release? Is this work in progress?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -471,7 +471,7 @@ private void processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePipeS
           if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
             for (Object singleRow : (Collection) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
               GenericRow transformedRow = 
_recordTransformer.transform((GenericRow) singleRow);
-              if (transformedRow != null) {
+              if (transformedRow != null && 
IngestionUtils.passedFilter(transformedRow)) {

Review comment:
       ```suggestion
                 if (transformedRow != null && 
IngestionUtils.isRowPruned(transformedRow)) {
   ```
   Or, `IngestionUtils.shouldIngestRow()`?
   
   Just a suggestion, you decide

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -1185,12 +1185,12 @@ public 
LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
             .setConsumerDir(consumerDir);
 
     // Create message decoder
-    _messageDecoder =
-        StreamDecoderProvider.create(_partitionLevelStreamConfig, 
SchemaUtils.extractSourceFields(_schema));
+    _messageDecoder = StreamDecoderProvider
+        .create(_partitionLevelStreamConfig, 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema));

Review comment:
       Same comment as before

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
##########
@@ -169,9 +169,9 @@ public HLRealtimeSegmentDataManager(final 
RealtimeSegmentZKMetadata realtimeSegm
     // create and init stream level consumer
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
-    _streamLevelConsumer = streamConsumerFactory
-        .createStreamLevelConsumer(clientId, _tableNameWithType, 
SchemaUtils.extractSourceFields(schema),
-            instanceMetadata.getGroupId(_tableNameWithType));
+    _streamLevelConsumer = 
streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType,
+        IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema),

Review comment:
       I think I have the answer for my question. I think this is because we 
may have filter expressions on columns that are not present in the schema.
   If this explanation is right, can you please document it some place? 
(Perhaps in IngestionUtils class?) Also, it may be worthwhile to make two 
separate calls, and let the caller decide whether to union the fields or 
otherwise use them independently. Not sure if there is a use case, but think 
about it. It certainly makes the code more readable. 
getFieldsToExtractFromStream(), and getFieldsToFilterOn().
   Lastly, is it enough to pass in the IngestionConfig (or, maybe even just the 
FilterConfig) rather than entire TableConfig? This will keep the discipline of 
adding any ingestion related item to IngestionConfig than looking some place 
else in the TableConfig

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
##########
@@ -41,28 +39,6 @@
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaUtils.class);
 
-  /**
-   * Extracts the source fields and destination fields from the schema
-   * For field specs with a transform expression defined, use the arguments 
provided to the function
-   * By default, add the field spec name
-   *
-   * TODO: for now, we assume that arguments to transform function are in the 
source i.e. there's no columns which are derived from transformed columns
-   */
-  public static Set<String> extractSourceFields(Schema schema) {

Review comment:
       Spark/Hadoop jobs that depend on this method may break. Consider 
deprecating it in stead of removing it. Or, atleast add a backward incompat

##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/FilterConfig.java
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Configs related to filtering records during ingestion
+ */
+public class FilterConfig extends BaseJsonConfig {
+
+  @JsonPropertyDescription("Filter function string. Filter out records during 
ingestion, if this evaluates to true")
+  private final String _filterFunction;

Review comment:
       Shouldn't this be a list of filter functions applied in sequence 
specified? How do we indicate filter like:
   1. Sample x% of the records
   2. filter out those with age > 40
   
   Or, 
   1. Filter out those with age > 40
   2. Sample x% of these records.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
##########
@@ -169,9 +169,9 @@ public HLRealtimeSegmentDataManager(final 
RealtimeSegmentZKMetadata realtimeSegm
     // create and init stream level consumer
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
-    _streamLevelConsumer = streamConsumerFactory
-        .createStreamLevelConsumer(clientId, _tableNameWithType, 
SchemaUtils.extractSourceFields(schema),
-            instanceMetadata.getGroupId(_tableNameWithType));
+    _streamLevelConsumer = 
streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType,
+        IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema),

Review comment:
       High level comment, recording it here so that I dont forget. 
Intuitively, rows ingested should be indicated by the schema, not by whether 
the row is filtered or not.  So, I am not sure what this change implies.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to