Hi,

If it's all right with you, I wanna try it.

Thanks

Sincerely
Rick

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Friday, October 20, 2017 1:32 PM
To: [email protected]
Subject: Re: How to use ConsoleIO SDK

Hi,

I started to work on the ConsoleIO (and SocketIO too), but it's not yet merged.

I can provide a SNAPSHOT to you if you wanna try.

Regards
JB

On 10/20/2017 04:14 AM, [email protected] wrote:
> Dear sir,
>
> I have the question how to use the beam java sdk: ConsoleIO.
>
> My objective colored in background yellow is to write the PCollection
> ”data” on Console, and then use it(type: RDD ??) as another variable to do 
> other works.
>
> If any further information is needed, I am glad to be informed and
> will provide to you as soon as possible.
>
> I am looking forward to hearing from you.
>
> My java code is as:
>
> “
>
> *import *java.io.IOException;
>
> *import*org.apache.beam.sdk.Pipeline;
>
> *import*org.apache.beam.sdk.options.PipelineOptionsFactory;
>
> *import*org.apache.beam.runners.spark.SparkRunner;
>
> *import*org.apache.beam.runners.spark.io.ConsoleIO;
>
> *import*org.apache.beam.runners.spark.SparkPipelineOptions;
>
> **
>
> *import *org.apache.beam.sdk.transforms.Create;
>
> *import *org.apache.beam.sdk.values.KV;
>
> *import *org.apache.beam.sdk.values.PCollection;
>
> *import *org.apache.beam.sdk.values.TimestampedValue;
>
> **
>
> *import *javafx.util.Pair;**
>
> **
>
> *import*org.joda.time.Duration;
>
> *import*org.joda.time.Instant;
>
> *import*org.joda.time.MutableDateTime;
>
> *public**static**void*main(String[] args) *throws*IOException  {
>
>         MutableDateTime mutableNow=
> Instant./now/().toMutableDateTime();
>
> mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
>
>         Instant starttime= mutableNow.toInstant().plus(8*60*60*1000);
>
> *int*min;
>
> *int*sec;
>
> *int*millsec;
>
> min=2;
>
> sec=min*60;
>
> millsec=sec*1000;
>
> *double*[] value=*new**double*[] {1.0,2.0,3.0,4.0,5.0};
>
>         List<TimestampedValue<KV<String,Pair<Integer, Double>>>>
> dataList= *new*ArrayList<>();
>
> *int*n=value.length;
>
> *int*count=0;
>
> *for*(*int*i=0; i<n; i++)
>
>          {
>
> count=count+1;
>
> *if*(i<=3)
>
>             {
>
>                Instant M1_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer,
> Double> (i,value[i])), M1_time));
>
>             }
>
> *else**if*(4<=i&& i<5)
>
>             {
>
>                Instant M2_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer,
> Double> (i,value[i])), M2_time));
>
>             }
>
> *else*
>
>             {
>
>                Instant M3_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer,
> Double> (i,value[i])), M3_time));
>
>             }
>
>             System.*/out/*.println("raw_data="+dataList.get(i));
>
>          }
>
>         SparkPipelineOptions options=
> PipelineOptionsFactory./as/(SparkPipelineOptions.*class*);
>
> options.setRunner(SparkRunner.*class*);
>
> options.setSparkMaster("local[4]");
>
>         Pipeline p= Pipeline./create/(options);
>
> PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create
> data with time",Create./timestamped/(dataList));
>
> data.apply("spark_write_on_console",ConsoleIO.Write._out_);
>
> p.run().waitUntilFinish();
>
> ”
>
> Thanks very much
>
> Sincerely yours,
>
> Liang-Sian Lin, Dr.
>
> Oct 20 2017
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀
> 此信件。 This email may contain confidential information. Please do not
> use or disclose it in any way and delete it if you are not the intended 
> recipient.

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to