Thanks for your response Aditya and Steve.
Steve:
I have tried specifying both /tmp/filename in hdfs and local path but it
didn't work.
You may be write that Kie session is configured  to  access files from
Local path.
I have attached code here for your reference and if you find some thing
wrong, please help to correct it.

Aditya:
I have attached code here for reference. --File option will distributed
reference file to all node but  Kie session is not able  to pickup it.

Thanks,
Abhishek

On Fri, Sep 23, 2016 at 2:25 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 23 Sep 2016, at 08:33, ABHISHEK <abhi...@gmail.com> wrote:
>
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: hdfs:/abc.com:8020/user/
> abhietc/abc.drl (No such file or directory)
>         at java.io.FileInputStream.open(Native Method)
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>         at org.drools.core.io.impl.FileSystemResource.getInputStream(
> FileSystemResource.java:123)
>         at org.drools.compiler.kie.builder.impl.KieFileSystemImpl.write(
> KieFileSystemImpl.java:58)
>
>
>
> Looks like this .KieFileSystemImpl class only works with local files, so
> when it gets an HDFS path in it tries to open it and gets confused.
>
> you may need to write to a local FS temp file then copy it into HDFS
>
object Mymain {
  def main(args: Array[String]): Unit = {

   // @  abhishek 
   //val fileName = "./abc.drl"   //  code works if I run app in local mode 
   
  
    val fileName = args(0)
    val conf = new SparkConf().setAppName("LocalStreaming")
    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(1)) 
    val brokers = "190.51.231.132:9092"
    val groupId = "testgroup"
    val offsetReset = "smallest"
    val pollTimeout = "1000"
    val topics = "NorthPole"
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      "spark.kafka.poll.time" -> pollTimeout)

    val detailTable = "emp1 "
    val summaryTable= "emp2"
    val confHBase = HBaseConfiguration.create()
    confHBase.set("hbase.zookeeper.quorum", "190.51.231.132")
    confHBase.set("hbase.zookeeper.property.clientPort", "2181")
    confHBase.set("hbase.master", "190.51.231.132:60000")
    val emp_detail_config = Job.getInstance(confHBase)
    val emp_summary_config = Job.getInstance(confHBase)
 
    emp_detail_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
"emp_detail");
    
emp_detail_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
                                                  
    emp_summary_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
"emp_summary")
    
emp_summary_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)

    messages.foreachRDD(rdd => {
      if (rdd.count > 0) {

        val messageRDD: RDD[String] = rdd.map { y => y._2 }
        val inputJsonObjectRDD = messageRDD.map(row => 
Utilities.convertToJsonObject(row))

        inputJsonObjectRDD.map(row => 
BuildJsonStrctures.buildTaxCalcDetail(row)).saveAsNewAPIHadoopDataset(emp_detail_config.getConfiguration)
        
        val inputObjectRDD = messageRDD.map(row => 
Utilities.convertToSubmissionJavaObject(row))
        val executedObjectRDD = inputObjectRDD.mapPartitions(row => 
KieSessionFactory.execute(row, fileName.toString()))
        val executedJsonRDD = executedObjectRDD.map(row => 
Utilities.convertToSubmissionJSonString(row))
          .map(row => Utilities.convertToJsonObject(row))

        val summaryInputJsonRDD = executedObjectRDD

        summaryInputJsonRDD.map(row => 
BuildJsonStrctures.buildSummary2(row)).saveAsNewAPIHadoopDataset(emp_summary_config.getConfiguration)

      } else {
        println("No message received") //this works only in master local mode
      }
    })
    ssc.start()
    ssc.awaitTermination()

  }
}
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to