[ https://issues.apache.org/jira/browse/PIRK-21?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15432902#comment-15432902 ]
ASF GitHub Bot commented on PIRK-21: ------------------------------------ Github user ellisonanne commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/76#discussion_r75878104 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java --- @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.spark.streaming; + +import java.math.BigInteger; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pirk.inputformat.hadoop.BaseInputFormat; +import org.apache.pirk.inputformat.hadoop.InputFormatConst; +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.spark.Accumulators; +import org.apache.pirk.responder.wideskies.spark.BroadcastVars; +import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper; +import org.apache.pirk.responder.wideskies.spark.EncColMultReducer; +import org.apache.pirk.responder.wideskies.spark.EncRowCalc; +import org.apache.pirk.responder.wideskies.spark.FilterData; +import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData; +import org.apache.pirk.schema.data.DataSchema; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Master class for the PIR query spark streaming application + * <p> + * NOTE: + * <p> + * - NOT using Elasticsearch in practice - proved to be some speed issues with ES and Spark that appear to be ES-Spark specific - leave code in anticipation + * that the ES-Spark issues resolve... + * <p> + * - Even if rdd.count() calls are embedded in logger.debug statements, they are computed by Spark. Thus, they are commented out in the code below - uncomment + * for rdd.count() debug + * + */ +public class ComputeStreamingResponse +{ + private static final Logger logger = LoggerFactory.getLogger(ComputeStreamingResponse.class); + + private String dataInputFormat = null; + private String inputData = null; + private String outputFile = null; + private String outputDirExp = null; + + private String queryInput = null; + QuerySchema qSchema = null; + + private String esQuery = "none"; + private String esResource = "none"; + + private FileSystem fs = null; + private HadoopFileSystemStore storage = null; + private JavaStreamingContext jssc = null; + + boolean useQueueStream = false; + + private long batchSeconds = 0; + private long windowLength = 0; + + private Accumulators accum = null; + private BroadcastVars bVars = null; + + private QueryInfo queryInfo = null; + Query query = null; + + private int numDataPartitions = 0; + private int numColMultPartitions = 0; + + private boolean colMultReduceByKey = false; + + public ComputeStreamingResponse(FileSystem fileSys) throws Exception + { + fs = fileSys; + storage = new HadoopFileSystemStore(fs); + + dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); + if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat)) + { + throw new IllegalArgumentException("inputFormat = " + dataInputFormat + " is of an unknown form"); + } + logger.info("inputFormat = " + dataInputFormat); + if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) + { + inputData = SystemConfiguration.getProperty("pir.inputData", "none"); + if (inputData.equals("none")) + { + throw new IllegalArgumentException("For inputFormat = " + dataInputFormat + " an inputFile must be specified"); + } + logger.info("inputFile = " + inputData); + } + else if (dataInputFormat.equals(InputFormatConst.ES)) + { + esQuery = SystemConfiguration.getProperty("pir.esQuery", "none"); + esResource = SystemConfiguration.getProperty("pir.esResource", "none"); + if (esQuery.equals("none")) + { + throw new IllegalArgumentException("esQuery must be specified"); + } + if (esResource.equals("none")) + { + throw new IllegalArgumentException("esResource must be specified"); + } + logger.info("esQuery = " + esQuery + " esResource = " + esResource); + } + outputFile = SystemConfiguration.getProperty("pir.outputFile"); + outputDirExp = outputFile + "_exp"; + + queryInput = SystemConfiguration.getProperty("pir.queryInput"); + String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); + + logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + + " esResource = " + esResource); + + // Pull the batchSeconds and windowLength parameters + batchSeconds = Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds", "30")); + windowLength = Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.windowLength", "60")); + if (windowLength % batchSeconds != 0) + { + throw new IllegalArgumentException("batchSeconds = " + batchSeconds + " must divide windowLength = " + windowLength); + } + useQueueStream = SystemConfiguration.getProperty("pir.sparkstreaming.useQueueStream", "false").equals("false"); + + // Set the necessary configurations + SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster"); + conf.set("es.nodes", SystemConfiguration.getProperty("es.nodes", "none")); + conf.set("es.port", SystemConfiguration.getProperty("es.port", "none")); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.streaming.stopGracefullyOnShutdown", SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false")); --- End diff -- Yes - I will update it with the new typing. > Apache Spark Streaming Implementation > ------------------------------------- > > Key: PIRK-21 > URL: https://issues.apache.org/jira/browse/PIRK-21 > Project: PIRK > Issue Type: Improvement > Components: Responder > Reporter: Ellison Anne Williams > Assignee: Ellison Anne Williams > > Provide a Spark streaming implementation for Pirk. > Although there is discussion and a forthcoming JIRA issue for Pirk > integration with Apache Beam, we can, at the very least, use this > implementation to benchmark straight Spark Streaming vs Beam + Spark > Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)