Thanks for your response Aditya and Steve. Steve: I have tried specifying both /tmp/filename in hdfs and local path but it didn't work. You may be write that Kie session is configured to access files from Local path. I have attached code here for your reference and if you find some thing wrong, please help to correct it.
Aditya: I have attached code here for reference. --File option will distributed reference file to all node but Kie session is not able to pickup it. Thanks, Abhishek On Fri, Sep 23, 2016 at 2:25 PM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 23 Sep 2016, at 08:33, ABHISHEK <abhi...@gmail.com> wrote: > > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: hdfs:/abc.com:8020/user/ > abhietc/abc.drl (No such file or directory) > at java.io.FileInputStream.open(Native Method) > at java.io.FileInputStream.<init>(FileInputStream.java:146) > at org.drools.core.io.impl.FileSystemResource.getInputStream( > FileSystemResource.java:123) > at org.drools.compiler.kie.builder.impl.KieFileSystemImpl.write( > KieFileSystemImpl.java:58) > > > > Looks like this .KieFileSystemImpl class only works with local files, so > when it gets an HDFS path in it tries to open it and gets confused. > > you may need to write to a local FS temp file then copy it into HDFS >
object Mymain { def main(args: Array[String]): Unit = { // @ abhishek //val fileName = "./abc.drl" // code works if I run app in local mode val fileName = args(0) val conf = new SparkConf().setAppName("LocalStreaming") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val brokers = "190.51.231.132:9092" val groupId = "testgroup" val offsetReset = "smallest" val pollTimeout = "1000" val topics = "NorthPole" val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false", "spark.kafka.poll.time" -> pollTimeout) val detailTable = "emp1 " val summaryTable= "emp2" val confHBase = HBaseConfiguration.create() confHBase.set("hbase.zookeeper.quorum", "190.51.231.132") confHBase.set("hbase.zookeeper.property.clientPort", "2181") confHBase.set("hbase.master", "190.51.231.132:60000") val emp_detail_config = Job.getInstance(confHBase) val emp_summary_config = Job.getInstance(confHBase) emp_detail_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "emp_detail"); emp_detail_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) emp_summary_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "emp_summary") emp_summary_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) messages.foreachRDD(rdd => { if (rdd.count > 0) { val messageRDD: RDD[String] = rdd.map { y => y._2 } val inputJsonObjectRDD = messageRDD.map(row => Utilities.convertToJsonObject(row)) inputJsonObjectRDD.map(row => BuildJsonStrctures.buildTaxCalcDetail(row)).saveAsNewAPIHadoopDataset(emp_detail_config.getConfiguration) val inputObjectRDD = messageRDD.map(row => Utilities.convertToSubmissionJavaObject(row)) val executedObjectRDD = inputObjectRDD.mapPartitions(row => KieSessionFactory.execute(row, fileName.toString())) val executedJsonRDD = executedObjectRDD.map(row => Utilities.convertToSubmissionJSonString(row)) .map(row => Utilities.convertToJsonObject(row)) val summaryInputJsonRDD = executedObjectRDD summaryInputJsonRDD.map(row => BuildJsonStrctures.buildSummary2(row)).saveAsNewAPIHadoopDataset(emp_summary_config.getConfiguration) } else { println("No message received") //this works only in master local mode } }) ssc.start() ssc.awaitTermination() } }
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org