Dear sir,

I have the question how to use the beam java sdk: ConsoleIO.

My objective colored in background yellow is to write the PCollection ”data” on 
Console, and then use it(type: RDD ??) as another variable to do other works.

If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.

I am looking forward to hearing from you.

My java code is as:
“

import java.io.IOException;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.SparkPipelineOptions;

import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;

import javafx.util.Pair;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;

public static void main(String[] args) throws IOException  {
       MutableDateTime mutableNow = Instant.now().toMutableDateTime();
       mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
       Instant starttime = mutableNow.toInstant().plus(8*60*60*1000);
       int min;
       int sec;
       int millsec;
       min=2;
       sec=min*60;
       millsec=sec*1000;
       double[] value=new double[] {1.0,2.0,3.0,4.0,5.0};
       List<TimestampedValue<KV<String,Pair<Integer, Double>>>> dataList = new 
ArrayList<>();
       int n=value.length;
       int count=0;
       for (int i=0; i<n; i++)
        {
           count=count+1;
           if (i<=3)
           {
              Instant M1_time=starttime.plus(millsec*count);
              dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer, 
Double> (i,value[i])), M1_time));
           }

           else if (4<=i && i<5)
           {
              Instant M2_time=starttime.plus(millsec*count);
              dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer, 
Double> (i,value[i])), M2_time));
           }

           else
           {
              Instant M3_time=starttime.plus(millsec*count);
              dataList.add(TimestampedValue.of(KV.of("M1", new Pair<Integer, 
Double> (i,value[i])), M3_time));
           }
           System.out.println("raw_data="+dataList.get(i));
        }

       SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
       options.setRunner(SparkRunner.class);
       options.setSparkMaster("local[4]");
       Pipeline p = Pipeline.create(options);

       PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create data 
with time",Create.timestamped(dataList));


data.apply("spark_write_on_console",ConsoleIO.Write.out);

       p.run().waitUntilFinish();
”

Thanks very much

Sincerely yours,
Liang-Sian Lin, Dr.
Oct 20 2017


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to