tanmaya-panda1 commented on code in PR #7624:
URL: https://github.com/apache/nifi/pull/7624#discussion_r1381171499


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private transient KustoIngestService service;
+    private boolean streamingEnabled;
+    private boolean pollOnIngestionStatus;
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming enabled")
+            .displayName("Streaming enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll on ingestion status")
+            .displayName("Whether to Poll on ingestion status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")
+            .required(false)
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("false")
+            .build();
+    public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor
+            .Builder().name("Kusto Ingest Service")
+            .displayName("Kusto Ingest Service")
+            .description("Azure Data Explorer Kusto Ingest Service")
+            .required(true)
+            .identifiesControllerService(KustoIngestService.class)
+            .build();
+
+    static final PropertyDescriptor DATA_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Data Format")
+            .displayName("Data Format")
+            .description("The format of the data that is sent to Azure Data 
Explorer.")
+            .required(true)
+            .allowableValues(KustoIngestDataFormat.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor IGNORE_FIRST_RECORD = new 
PropertyDescriptor.Builder()
+            .name("Ingestion Ignore First Record")
+            .displayName("Ingestion Ignore First Record")
+            .description("Defines whether ignore first record while 
ingestion.")
+            .required(false)
+            .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO)
+            .defaultValue(IGNORE_FIRST_RECORD_NO.getValue())
+            .build();
+
+    static final PropertyDescriptor ROUTE_PARTIALLY_SUCCESSFUL_INGESTION = new 
PropertyDescriptor.Builder()
+            .name("Route partially successful ingestion records")
+            .displayName("Route partially successful ingestion records")
+            .description("Defines where to route partially successful 
ingestion records.")
+            .required(false)
+            .allowableValues("Success", "Failure")
+            .defaultValue("Failure")
+            .build();
+
+    static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Timeout for polling on ingestion status")
+            .displayName("Timeout for polling on ingestion status in seconds")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, "true")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("600")
+            .build();
+
+    static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("Ingestion status polling interval")
+            .displayName("Ingestion status polling interval in seconds")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds.")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, "true")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("5")
+            .build();
+
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for success")
+            .build();
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Relationship for failure")
+            .build();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptorList = new ArrayList<>();
+        descriptorList.add(ADX_SERVICE);
+        descriptorList.add(DATABASE_NAME);
+        descriptorList.add(TABLE_NAME);
+        descriptorList.add(MAPPING_NAME);
+        descriptorList.add(DATA_FORMAT);
+        descriptorList.add(IGNORE_FIRST_RECORD);
+        descriptorList.add(IS_STREAMING_ENABLED);
+        descriptorList.add(POLL_ON_INGESTION_STATUS);
+        descriptorList.add(ROUTE_PARTIALLY_SUCCESSFUL_INGESTION);
+        descriptorList.add(INGESTION_STATUS_POLLING_TIMEOUT);
+        descriptorList.add(INGESTION_STATUS_POLLING_INTERVAL);
+        this.descriptors = Collections.unmodifiableList(descriptorList);
+
+        final Set<Relationship> relationshipSet = new HashSet<>();
+        relationshipSet.add(SUCCESS);
+        relationshipSet.add(FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationshipSet);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        service = 
context.getProperty(ADX_SERVICE).asControllerService(KustoIngestService.class);
+        if 
(checkIfIngestorRoleDoesntExist(context.getProperty(DATABASE_NAME).getValue(), 
context.getProperty(TABLE_NAME).getValue())) {
+            getLogger().error("User might not have ingestor privileges, table 
validation will be skipped for all table mappings.");
+            throw new ProcessException("User might not have ingestor 
privileges, table validation will be skipped for all table mappings. ");

Review Comment:
   Since without ingestor privilege, there is no point in starting the 
ingestion of data, hence we throw an exception. Also removed skipping of table 
validation message which might confuse the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to