Dear sir,
I have the question how to use the beam java sdk: ConsoleIO.
My objective colored in background yellow is to write the PCollection ”data” on
Console, and then use it(type: RDD ??) as another variable to do other works.
If any further information is needed, I am glad to be informed and will provide
to you as soon as possible.
I am looking forward to hearing from you.
My java code is as:
“
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import javafx.util.Pair;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
public static void main(String[] args) throws IOException {
MutableDateTime mutableNow = Instant.now().toMutableDateTime();
mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
Instant starttime = mutableNow.toInstant().plus(8*60*60*1000);
int min;
int sec;
int millsec;
min=2;
sec=min*60;
millsec=sec*1000;
double[] value=new double[] {1.0,2.0,3.0,4.0,5.0};
List<TimestampedValue<KV<String,Pair<Integer, Double>>>> dataList = new
ArrayList<>();
int n=value.length;
int count=0;
for (int i=0; i<n; i++)
{
count=count+1;
if (i<=3)
{
Instant M1_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer,
Double> (i,value[i])), M1_time));
}
else if (4<=i && i<5)
{
Instant M2_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer,
Double> (i,value[i])), M2_time));
}
else
{
Instant M3_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer,
Double> (i,value[i])), M3_time));
}
System.out.println("raw_data="+dataList.get(i));
}
SparkPipelineOptions options =
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[4]");
Pipeline p = Pipeline.create(options);
PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create data
with time",Create.timestamped(dataList));
data.apply("spark_write_on_console",ConsoleIO.Write.out);
p.run().waitUntilFinish();
”
Thanks very much
Sincerely yours,
Liang-Sian Lin, Dr.
Oct 20 2017
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.