[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428689#comment-15428689 ]
ASF GitHub Bot commented on PIRK-4: ----------------------------------- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75538064 --- Diff: src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java --- @@ -0,0 +1,327 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.storm; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; + +import org.I0Itec.zkclient.ZkConnection; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.QuerierConst; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.storm.*; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.test.utils.BaseTests; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Config; +import org.apache.storm.ILocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MkClusterParam; +import org.apache.storm.testing.TestJob; +import org.json.simple.JSONObject; + +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.math.BigInteger; +import java.util.*; + +@Category(IntegrationTest.class) +public class KafkaStormIntegrationTest +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class); + + private static final LocalFileSystemStore localStore = new LocalFileSystemStore(); + + private static TestingServer zookeeperLocalCluster; + private static KafkaServer kafkaLocalBroker; + private static ZkClient zkClient; + + private static final String topic = "pirk_test_topic"; + private static final String kafkaTmpDir = "/tmp/kafka"; + + private static File fileQuery; + private static File fileQuerier; + private static String localStopListFile; + + private QueryInfo queryInfo; + private BigInteger nSquared; + + @Test + public void testKafkaStormIntegration() throws Exception + { + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); + SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10"); + SystemConfiguration.setProperty("storm.spout.parallelism", "1"); + SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1"); + SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2"); + SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2"); + SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "4"); + SystemConfiguration.setProperty("storm.rowDivs", "2"); + SystemConfiguration.setProperty("hdfs.use", "false"); + + startZookeeper(); + startKafka(); + + SystemConfiguration.setProperty("kafka.topic", topic); + SystemConfiguration.setProperty("storm.topoName", "pirTest"); + + // Create encrypted file + localStopListFile = Inputs.createPIRStopList(null, false); + SystemConfiguration.setProperty("pir.stopListFile", localStopListFile); + Inputs.createSchemaFiles(StopListFilter.class.getName()); + + // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier + performEncryption(); + SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath()); + + KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); + loadTestData(producer); + + + logger.info("Test (splitPartitions,saltColumns) = (true,true)"); + SystemConfiguration.setProperty("storm.splitPartitions", "true"); + SystemConfiguration.setProperty("storm.saltColumns", "true"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (true,false)"); + SystemConfiguration.setProperty("storm.splitPartitions", "true"); + SystemConfiguration.setProperty("storm.saltColumns", "false"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (false,true)"); + SystemConfiguration.setProperty("storm.splitPartitions", "false"); + SystemConfiguration.setProperty("storm.saltColumns", "true"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (false,false)"); + SystemConfiguration.setProperty("storm.splitPartitions", "false"); + SystemConfiguration.setProperty("storm.saltColumns", "false"); + runTest(); + } + + private void runTest() throws Exception + { + File responderFile = File.createTempFile("responderFile", ".txt"); + logger.info("Starting topology."); + runTopology(responderFile); + + // decrypt results + logger.info("Decrypting results. " + responderFile.length()); + File fileFinalResults = performDecryption(responderFile); + + // check results + List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults); + BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, Inputs.createJSONDataElements()); + + responderFile.deleteOnExit(); + fileFinalResults.deleteOnExit(); + } + + private void runTopology(File responderFile) throws Exception + { + MkClusterParam mkClusterParam = new MkClusterParam(); + // The test sometimes fails because of timing issues when more than 1 supervisor set. + mkClusterParam.setSupervisors(1); + + // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid worrying about timing. + Config conf = PirkTopology.createStormConf(); + conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath()); + conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString()); + conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap()); + // conf.setDebug(true); + mkClusterParam.setDaemonConf(conf); + + TestJob testJob = createPirkTestJob(conf); + Testing.withLocalCluster(mkClusterParam, testJob); + // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob); + } + + private TestJob createPirkTestJob(final Config config) + { + final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config); + return new TestJob() + { + StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig); + + @Override + public void run(ILocalCluster iLocalCluster) throws Exception + { + iLocalCluster.submitTopology("pirk_integration_test", config, topology); + logger.info("Pausing for setup."); + //Thread.sleep(4000); + //KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); + //loadTestData(producer); + Thread.sleep(6000); + OutputBolt.latch.await(); + logger.info("Finished..."); + } + }; + } + + private SpoutConfig setUpTestKafkaSpout(Config conf) + { + ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString()); + + SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, "/pirk_test_root", "pirk_integr_test_spout"); + kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf)); + logger.info("KafkaConfig initialized..."); + + return kafkaConfig; + } + + private void startZookeeper() throws Exception + { + logger.info("Starting zookeeper."); + zookeeperLocalCluster = new TestingServer(); + zookeeperLocalCluster.start(); + logger.info("Zookeeper initialized."); + + } + + private void startKafka() throws Exception + { + FileUtils.deleteDirectory(new File(kafkaTmpDir)); + + Properties props = new Properties(); + props.setProperty("zookeeper.session.timeout.ms", "100000"); + props.put("advertised.host.name", "localhost"); + props.put("port", 11111); + // props.put("broker.id", "0"); + props.put("log.dir", kafkaTmpDir); + props.put("enable.zookeeper", "true"); + props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString()); + KafkaConfig kafkaConfig = KafkaConfig.fromProps(props); + kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), scala.Option.apply("kafkaThread")); + kafkaLocalBroker.startup(); + + zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 60000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperLocalCluster.getConnectString()), false); + //ZkUtils zkUtils = ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false); + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); + } + + @AfterClass + public static void tearDown() throws Exception + { + zkClient.close(); + kafkaLocalBroker.shutdown(); + zookeeperLocalCluster.stop(); + + FileUtils.deleteDirectory(new File(kafkaTmpDir)); + + fileQuery.delete(); + fileQuerier.delete(); + + new File(localStopListFile).delete(); + } + + private HashMap<String,Object> createKafkaProducerConfig() + { + String kafkaHostName = "localhost"; + Integer kafkaPorts = 11111; + HashMap<String,Object> config = new HashMap<String,Object>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts); + config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + return config; + } + + private void loadTestData(KafkaProducer producer) + { + for (JSONObject dataRecord : Inputs.createJSONDataElements()) + { + logger.info("Sending record to Kafka " + dataRecord.toString()); + producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString())); + } + } + + private void performEncryption() throws Exception + { + // ArrayList<String> selectors = BaseTests.selectorsDomain; + ArrayList<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); + String queryType = Inputs.DNS_HOSTNAME_QUERY; + + Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); + + nSquared = paillier.getNSquared(); + + queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType, + false, true, false); + + // Perform the encryption + logger.info("Performing encryption of the selectors - forming encrypted query vectors:"); + EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier); + encryptQuery.encrypt(1); + logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); + + // Write out files. + fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt"); + fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt"); + + localStore.store(fileQuerier.getAbsolutePath(), encryptQuery.getQuerier()); + localStore.store(fileQuery, encryptQuery.getQuery()); + } + + private File performDecryption(File responseFile) throws Exception + { + File finalResults = File.createTempFile("finalFileResults", ".txt"); + String querierFilePath = fileQuerier.getAbsolutePath(); + String responseFilePath = responseFile.getAbsolutePath(); + String outputFile = finalResults.getAbsolutePath(); + int numThreads = 1; + + Response response = localStore.recall(responseFilePath, Response.class); + Querier querier = localStore.recall(querierFilePath, Querier.class); + + // Perform decryption and output the result file + DecryptResponse decryptResponse = new DecryptResponse(response, querier); + decryptResponse.decrypt(numThreads); + decryptResponse.writeResultFile(outputFile); + return finalResults; + } + +} --- End diff -- This is a cool integration test. As we add more backends, we may need to consider abstract some of the zookeeper and kafka code to a base AbstractPirkIT class and extend that for the different backends. > Add Streaming Implementation for Apache Storm > --------------------------------------------- > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder > Reporter: Chris Harris > Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)