[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246739693 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,12 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +import org.apache.flink.api.java.tuple.Tuple2 +val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the SequenceFileWriter Review comment: Add a blank before `we`. Not sure about the comment. How about: Use java Tuple2 if we want to use SequenceFileWriter. We can also wait suggestions from the other reviewers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246739676 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -105,8 +105,8 @@ Example: {% highlight java %} DataStream> input = ...; -BucketingSink sink = new BucketingSink("/base/path"); -sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); +BucketingSink> sink = new BucketingSink>("/base/path"); +sink.setBucketer(new DateTimeBuckete("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); Review comment: DateTimeBuckete => DateTimeBucketer. Maybe we should write `new DateTimeBucketer<>("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))` to avoid unchecked assignment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246736573 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,12 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +import org.apache.flink.api.java.tuple.Tuple2 +val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the SequenceFileWriter Review comment: Add a blank before `we`. Not sure about the comment. How about: Use java Tuple2 if we want to use SequenceFileWriter. We can also wait suggestions from the other reviewers. Thank you very much for the fix and update. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246736573 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,12 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +import org.apache.flink.api.java.tuple.Tuple2 +val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the SequenceFileWriter Review comment: Add a blank before `we`. Not sure about the comment. How about: Use java Tuple2 if we want to use SequenceFileWriter. We can also wait suggestions from the other reviewers. Thank you very much for the fix and update. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246237842 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) +sink.setWriter(new StringWriter[(IntWritable, Text)]()) Review comment: @123avi What I mean is don't use scala tuple. Use java tuple even for the scala example. `val input: DataStream[Tuple2[A, B]]` is different from `val input: DataStream[(A, B)]`. `org.apache.flink.api.java.tuple.Tuple2` is a class in Flink. I wrote a sample code for you. Take a look at the code [here](https://github.com/hequn8128/flink/blob/bucketSinkTest/flink-connectors/flink-connector-filesystem/src/test/scala/org/apache/flink/streaming/connectors/fs/bucketing/ScalaBucketingSinkTest.scala#L62). You can also try to run the test. It works well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246013215 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) +sink.setWriter(new StringWriter[(IntWritable, Text)]()) Review comment: @123avi Have you tried java tuple2 for the example? `org.apache.flink.api.java.tuple.Tuple2`. The code should be: `val input: DataStream[Tuple2[A, B]] = ???` not `val input: DataStream[(A, B)] = ???` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r246013215 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) +sink.setWriter(new StringWriter[(IntWritable, Text)]()) Review comment: @123avi Have you try java tuple2 for the example? `org.apache.flink.api.java.tuple.Tuple2`. The code should be: `val input: DataStream[Tuple2[A, B]] = ???` not `val input: DataStream[(A, B)] = ???` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r245853342 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) +sink.setWriter(new StringWriter[(IntWritable, Text)]()) Review comment: @123avi You are right. I think it's better not to change the type of DataStream, i.e, keep it as `Tuple2`. The `SequenceFileWriter` extends `StreamWriterBase>` and `StreamWriterBase implements Writer`. In this example, `T` is `Tuple2[IntWritable, Text]` and `K` is `IntWritable` and `V` is `Text`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r245535022 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) +sink.setWriter(new StringWriter[(IntWritable, Text)]()) Review comment: Recover to `SequenceFileWriter`. The default writer is `StringWriter`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
hequn8128 commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r245535017 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) Review comment: Add timezone parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services