Hello all,

My requirement is to re-read the csv file from a file path at certain time
intervals and process the csv data. The csv file gets updated at regular
intervals.
Below is my code:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
*DataStream<String> dataStream = getCsvDataStream(see);*
DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));

In FetchStock.java
public class FetchStock extends RichSourceFunction<Stock> {
public FetchStock(String csvPath) {
this.csvPath = csvPath;
}
}

I am trying to adapt code from *WikipediaAnalysis, *but getting the below
not serializable exception on adding source:
*Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
wikiedits.FetchStock@d7b1517 not serializable*
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
*Caused by: java.io.NotSerializableException:
org.apache.flink.streaming.api.environment.LocalStreamEnvironment*
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 6 more


I have attached Stock.java which is just a model with getters and setters.
Not sure what am I doing wrong.

Best Regards,
Subash Basnet
package wikiedits;

import java.io.Serializable;

public class Stock{

	// private String symbol;
	// private double price;
	// private int volume;
	// private double pe;
	// private double eps;
	// private double week52low;
	// private double week52high;
	// private double daylow;
	// private double dayhigh;
	// private double movingav50day;
	// private double marketcap;
	// private String name;
	// private String currency;
	// private double shortRatio;
	// private double previousClose;
	// private double open;
	// private String exchange;

	private String id;
	private int timeStamp;
	private double close;
	private double high;
	private double low;
	private double open;
	private int volume;

	public Stock() {
	}

	public Stock(int timeStamp, double close, double high, double low, double open, int volume, String id) {
		this.timeStamp = timeStamp;
		this.close = close;
		this.high = high;
		this.low = low;
		this.open = open;
		this.volume = volume;
		this.id = id;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public double getLow() {
		return low;
	}

	public void setLow(double low) {
		this.low = low;
	}

	public int getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(int timeStamp) {
		this.timeStamp = timeStamp;
	}

	public double getClose() {
		return close;
	}

	public void setClose(double close) {
		this.close = close;
	}

	public double getHigh() {
		return high;
	}

	public void setHigh(double high) {
		this.high = high;
	}

	public double getOpen() {
		return open;
	}

	public void setOpen(double open) {
		this.open = open;
	}

	public int getVolume() {
		return volume;
	}

	public void setVolume(int volume) {
		this.volume = volume;
	}

}

Reply via email to