[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-10 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246739693
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,12 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+import org.apache.flink.api.java.tuple.Tuple2
+val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the 
SequenceFileWriter
 
 Review comment:
   Add a blank before `we`. Not sure about the comment. How about: Use java 
Tuple2 if we want to use SequenceFileWriter.
   We can also wait suggestions from the other reviewers. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-10 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246739676
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -105,8 +105,8 @@ Example:
 {% highlight java %}
 DataStream> input = ...;
 
-BucketingSink sink = new BucketingSink("/base/path");
-sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")));
+BucketingSink> sink = new 
BucketingSink>("/base/path");
+sink.setBucketer(new DateTimeBuckete("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")));
 
 Review comment:
   DateTimeBuckete => DateTimeBucketer.
   Maybe we should write `new DateTimeBucketer<>("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles"))` to avoid unchecked assignment. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-10 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246736573
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,12 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+import org.apache.flink.api.java.tuple.Tuple2
+val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the 
SequenceFileWriter
 
 Review comment:
   Add a blank before `we`. Not sure about the comment. How about: Use java 
Tuple2 if we want to use SequenceFileWriter.
   We can also wait suggestions from the other reviewers. 
   
   Thank you very much for the fix and update. 
   
   Best, 
   Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-10 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246736573
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,12 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+import org.apache.flink.api.java.tuple.Tuple2
+val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the 
SequenceFileWriter
 
 Review comment:
   Add a blank before `we`. Not sure about the comment. How about: Use java 
Tuple2 if we want to use SequenceFileWriter.
   We can also wait suggestions from the other reviewers. 
   
   Thank you very much for the fix and update. 
   
   Best, 
   Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-08 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246237842
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   @123avi What I mean is don't use scala tuple. Use java tuple even for the 
scala example. 
   `val input: DataStream[Tuple2[A, B]]` is different from `val input: 
DataStream[(A, B)]`. `org.apache.flink.api.java.tuple.Tuple2` is a class in 
Flink.
   
   I wrote a sample code for you. Take a look at the code 
[here](https://github.com/hequn8128/flink/blob/bucketSinkTest/flink-connectors/flink-connector-filesystem/src/test/scala/org/apache/flink/streaming/connectors/fs/bucketing/ScalaBucketingSinkTest.scala#L62).
 You can also try to run the test. It works well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-08 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246013215
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   @123avi Have you tried java tuple2 for the example?  
`org.apache.flink.api.java.tuple.Tuple2`. 
   
   The code should be: 
   `val input: DataStream[Tuple2[A, B]] = ???` 
   not 
   `val input: DataStream[(A, B)] = ???`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-08 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246013215
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   @123avi Have you try java tuple2 for the example?  
`org.apache.flink.api.java.tuple.Tuple2`. 
   
   The code should be: 
   `val input: DataStream[Tuple2[A, B]] = ???` 
   not 
   `val input: DataStream[(A, B)] = ???`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-07 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r245853342
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   @123avi You are right. I think it's better not to change the type of 
DataStream, i.e, keep it as `Tuple2`. The `SequenceFileWriter` extends 
`StreamWriterBase>` and `StreamWriterBase implements Writer`.
   In this example, `T` is `Tuple2[IntWritable, Text]` and `K` is `IntWritable` 
and `V` is `Text`.
   What do you think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-06 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r245535022
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   Recover to `SequenceFileWriter`. The default writer is `StringWriter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-06 Thread GitBox
hequn8128 commented on a change in pull request #7418: FLINK-11053 
Documentation - update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r245535017
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
 
 Review comment:
   Add timezone parameter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services