Hi,

I started to work on the ConsoleIO (and SocketIO too), but it's not yet merged.

I can provide a SNAPSHOT to you if you wanna try.

Regards
JB

On 10/20/2017 04:14 AM, [email protected] wrote:
Dear sir,

I have the question how to use the beam java sdk: ConsoleIO.

My objective colored in background yellow is to write the PCollection ”data” on Console, and then use it(type: RDD ??) as another variable to do other works.

If any further information is needed, I am glad to be informed and will provide to you as soon as possible.

I am looking forward to hearing from you.

My java code is as:

“

*import *java.io.IOException;

*import*org.apache.beam.sdk.Pipeline;

*import*org.apache.beam.sdk.options.PipelineOptionsFactory;

*import*org.apache.beam.runners.spark.SparkRunner;

*import*org.apache.beam.runners.spark.io.ConsoleIO;

*import*org.apache.beam.runners.spark.SparkPipelineOptions;

**

*import *org.apache.beam.sdk.transforms.Create;

*import *org.apache.beam.sdk.values.KV;

*import *org.apache.beam.sdk.values.PCollection;

*import *org.apache.beam.sdk.values.TimestampedValue;

**

*import *javafx.util.Pair;**

**

*import*org.joda.time.Duration;

*import*org.joda.time.Instant;

*import*org.joda.time.MutableDateTime;

*public**static**void*main(String[] args) *throws*IOException  {

        MutableDateTime mutableNow= Instant./now/().toMutableDateTime();

mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);

        Instant starttime= mutableNow.toInstant().plus(8*60*60*1000);

*int*min;

*int*sec;

*int*millsec;

min=2;

sec=min*60;

millsec=sec*1000;

*double*[] value=*new**double*[] {1.0,2.0,3.0,4.0,5.0};

       List<TimestampedValue<KV<String,Pair<Integer, Double>>>> dataList= *new*ArrayList<>();

*int*n=value.length;

*int*count=0;

*for*(*int*i=0; i<n; i++)

         {

count=count+1;

*if*(i<=3)

            {

               Instant M1_time=starttime.plus(millsec*count);

dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double> (i,value[i])), M1_time));

            }

*else**if*(4<=i&& i<5)

            {

               Instant M2_time=starttime.plus(millsec*count);

dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double> (i,value[i])), M2_time));

            }

*else*

            {

               Instant M3_time=starttime.plus(millsec*count);

dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double> (i,value[i])), M3_time));

            }

            System.*/out/*.println("raw_data="+dataList.get(i));

         }

       SparkPipelineOptions options= PipelineOptionsFactory./as/(SparkPipelineOptions.*class*);

options.setRunner(SparkRunner.*class*);

options.setSparkMaster("local[4]");

        Pipeline p= Pipeline./create/(options);

PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create data with time",Create./timestamped/(dataList));

data.apply("spark_write_on_console",ConsoleIO.Write._out_);

p.run().waitUntilFinish();

”

Thanks very much

Sincerely yours,

Liang-Sian Lin, Dr.

Oct 20 2017



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀 此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to