[ 
https://issues.apache.org/jira/browse/PIRK-21?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15432902#comment-15432902
 ] 

ASF GitHub Bot commented on PIRK-21:
------------------------------------

Github user ellisonanne commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/76#discussion_r75878104
  
    --- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
 ---
    @@ -0,0 +1,467 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.responder.wideskies.spark.streaming;
    +
    +import java.math.BigInteger;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
    +import org.apache.pirk.inputformat.hadoop.InputFormatConst;
    +import org.apache.pirk.query.wideskies.Query;
    +import org.apache.pirk.query.wideskies.QueryInfo;
    +import org.apache.pirk.responder.wideskies.spark.Accumulators;
    +import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
    +import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper;
    +import org.apache.pirk.responder.wideskies.spark.EncColMultReducer;
    +import org.apache.pirk.responder.wideskies.spark.EncRowCalc;
    +import org.apache.pirk.responder.wideskies.spark.FilterData;
    +import 
org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
    +import org.apache.pirk.schema.data.DataSchema;
    +import org.apache.pirk.schema.data.DataSchemaLoader;
    +import org.apache.pirk.schema.data.DataSchemaRegistry;
    +import org.apache.pirk.schema.query.QuerySchema;
    +import org.apache.pirk.schema.query.QuerySchemaLoader;
    +import org.apache.pirk.schema.query.QuerySchemaRegistry;
    +import org.apache.pirk.serialization.HadoopFileSystemStore;
    +import org.apache.pirk.utils.PIRException;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.apache.spark.SparkConf;
    +import org.apache.spark.api.java.JavaPairRDD;
    +import org.apache.spark.api.java.JavaRDD;
    +import org.apache.spark.api.java.JavaSparkContext;
    +import org.apache.spark.api.java.function.Function;
    +import org.apache.spark.api.java.function.VoidFunction;
    +import org.apache.spark.streaming.Durations;
    +import org.apache.spark.streaming.api.java.JavaDStream;
    +import org.apache.spark.streaming.api.java.JavaPairDStream;
    +import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    +import org.apache.spark.streaming.api.java.JavaStreamingContext;
    +import org.elasticsearch.hadoop.mr.EsInputFormat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Master class for the PIR query spark streaming application
    + * <p>
    + * NOTE:
    + * <p>
    + * - NOT using Elasticsearch in practice - proved to be some speed issues 
with ES and Spark that appear to be ES-Spark specific - leave code in 
anticipation
    + * that the ES-Spark issues resolve...
    + * <p>
    + * - Even if rdd.count() calls are embedded in logger.debug statements, 
they are computed by Spark. Thus, they are commented out in the code below - 
uncomment
    + * for rdd.count() debug
    + * 
    + */
    +public class ComputeStreamingResponse
    +{
    +  private static final Logger logger = 
LoggerFactory.getLogger(ComputeStreamingResponse.class);
    +
    +  private String dataInputFormat = null;
    +  private String inputData = null;
    +  private String outputFile = null;
    +  private String outputDirExp = null;
    +
    +  private String queryInput = null;
    +  QuerySchema qSchema = null;
    +
    +  private String esQuery = "none";
    +  private String esResource = "none";
    +
    +  private FileSystem fs = null;
    +  private HadoopFileSystemStore storage = null;
    +  private JavaStreamingContext jssc = null;
    +
    +  boolean useQueueStream = false;
    +
    +  private long batchSeconds = 0;
    +  private long windowLength = 0;
    +
    +  private Accumulators accum = null;
    +  private BroadcastVars bVars = null;
    +
    +  private QueryInfo queryInfo = null;
    +  Query query = null;
    +
    +  private int numDataPartitions = 0;
    +  private int numColMultPartitions = 0;
    +
    +  private boolean colMultReduceByKey = false;
    +
    +  public ComputeStreamingResponse(FileSystem fileSys) throws Exception
    +  {
    +    fs = fileSys;
    +    storage = new HadoopFileSystemStore(fs);
    +
    +    dataInputFormat = 
SystemConfiguration.getProperty("pir.dataInputFormat");
    +    if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
    +    {
    +      throw new IllegalArgumentException("inputFormat = " + 
dataInputFormat + " is of an unknown form");
    +    }
    +    logger.info("inputFormat = " + dataInputFormat);
    +    if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
    +    {
    +      inputData = SystemConfiguration.getProperty("pir.inputData", "none");
    +      if (inputData.equals("none"))
    +      {
    +        throw new IllegalArgumentException("For inputFormat = " + 
dataInputFormat + " an inputFile must be specified");
    +      }
    +      logger.info("inputFile = " + inputData);
    +    }
    +    else if (dataInputFormat.equals(InputFormatConst.ES))
    +    {
    +      esQuery = SystemConfiguration.getProperty("pir.esQuery", "none");
    +      esResource = SystemConfiguration.getProperty("pir.esResource", 
"none");
    +      if (esQuery.equals("none"))
    +      {
    +        throw new IllegalArgumentException("esQuery must be specified");
    +      }
    +      if (esResource.equals("none"))
    +      {
    +        throw new IllegalArgumentException("esResource must be specified");
    +      }
    +      logger.info("esQuery = " + esQuery + " esResource = " + esResource);
    +    }
    +    outputFile = SystemConfiguration.getProperty("pir.outputFile");
    +    outputDirExp = outputFile + "_exp";
    +
    +    queryInput = SystemConfiguration.getProperty("pir.queryInput");
    +    String stopListFile = 
SystemConfiguration.getProperty("pir.stopListFile");
    +
    +    logger.info("outputFile = " + outputFile + " queryInputDir = " + 
queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
    +        + " esResource = " + esResource);
    +
    +    // Pull the batchSeconds and windowLength parameters
    +    batchSeconds = 
Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds",
 "30"));
    +    windowLength = 
Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.windowLength",
 "60"));
    +    if (windowLength % batchSeconds != 0)
    +    {
    +      throw new IllegalArgumentException("batchSeconds = " + batchSeconds 
+ " must divide windowLength = " + windowLength);
    +    }
    +    useQueueStream = 
SystemConfiguration.getProperty("pir.sparkstreaming.useQueueStream", 
"false").equals("false");
    +
    +    // Set the necessary configurations
    +    SparkConf conf = new 
SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster");
    +    conf.set("es.nodes", SystemConfiguration.getProperty("es.nodes", 
"none"));
    +    conf.set("es.port", SystemConfiguration.getProperty("es.port", 
"none"));
    +    conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
    +    conf.set("spark.streaming.stopGracefullyOnShutdown", 
SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", 
"false"));
    --- End diff --
    
    Yes - I will update it with the new typing.


> Apache Spark Streaming Implementation
> -------------------------------------
>
>                 Key: PIRK-21
>                 URL: https://issues.apache.org/jira/browse/PIRK-21
>             Project: PIRK
>          Issue Type: Improvement
>          Components: Responder
>            Reporter: Ellison Anne Williams
>            Assignee: Ellison Anne Williams
>
> Provide a Spark streaming implementation for Pirk. 
> Although there is discussion and a forthcoming JIRA issue for Pirk 
> integration with Apache Beam, we can, at the very least, use this 
> implementation to benchmark straight Spark Streaming vs Beam + Spark 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to