Hi,

You don’t need to call execute() method after calling print() method. print() 
method triggers the execution. The exception is raised because you call 
execute() after print() method.

Regards,
Chiwan Park

> On Apr 27, 2016, at 6:35 PM, nsengupta <sengupta.nirma...@gmail.com> wrote:
> 
> Till,
> 
> Thanks for looking into this.
> 
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
> 
> *Here's my code (shortened for brevity):*
> 
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
> 
> object HVACReadingsAnalysis {
> 
>  def main(args: Array[String]): Unit = {
> 
>    val envDefault = ExecutionEnvironment.getExecutionEnvironment
> 
>    val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
> 
>    buildings.print
> 
>    envDefault.execute("HVAC Simulation")
>  }
> 
>  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
> 
>   // [NS]: I can see the lines, read correctly from the CSV file here
>    println("As read from CSV file")
>    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
> 
>    // [NS]: Then, I read the same file using the library function
>   env.readCsvFile [BuildingInformation] (
>      inputPath,
>      ignoreFirstLine = true,
>      pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
>    )
>  }
> 
> 
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment             
>   
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster      
>   
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                               
>   
> - Slf4jLogger started
> 
> 
> // ..
> // ... more log statements
> // ..
> 
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
>       at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
>       at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>       at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
>       at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
>       at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
>       at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> 
> Process finished with exit code 1
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to