Re: BucketAssigner - Confusion

2019-04-03 Thread Chesnay Schepler
BucketID is a variable type, and conceptually you can use any type so 
long as you can provide a serializer for it 
(BucketAssigner#getSerializer). The documentation is wrong in this instance.


The convenience Flink APIs 
(StreamingFileSink#forRowFormat/StreamingFileSink#forBulkFormat) default 
to Strings; but you can change the type by setting both an assigner and 
policy via "withBucketAssignerAndPolicy"; you should be able to use 
".DefaultRollingPolicy.create().build()" as a default policy.


On 02/04/2019 20:18, Jeff Crane wrote:
According to my IDE (Jetbrains), I get an error with getBucketID(IN, 
Context) signature requiring a return of string (Flink 1.7 libs), so I 
still don't think the BucketID is a variable type.


I still don't understand the role of the:
public SimpleVersionedSerializer getSerializer() {
 return SimpleVersionedStringSerializer.INSTANCE;
 }
Where does that come into play, if the getBucketID makes a string anyway?




On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane 
 wrote:



I have had an issue understanding the documentation, in regard to 
BucketAssigner.
BucketID 
  getBucketId(IN 
  element,
  BucketAssigner.Context 
  context)
SimpleVersionedSerializer 
> getSerializer()
First of all, I don't understand what type of "BucketID" means. I 
assume that's the returned type fo the getBucketID, which doesn't make 
sense. The description says getBucketId (returns?) "A string 
representing the identifier of the bucket" So BucketID is not a type, it's always a string?

Base on the docs, I implemented like this, which doesn't write anything!
public final class CustomBucketAssignerimplements BucketAssigner {
public String getBucketId(final MyEvent element,final Context context) {

 DateTime dateTimeL =new DateTime(context.currentWatermark());

 return String.join("_",
 String.valueOf(dateTimeL.getYear()),
 String.valueOf(dateTimeL.getMonthOfYear()),
 String.valueOf(dateTimeL.getDayOfMonth()),
 String.valueOf(dateTimeL.getHourOfDay()),
 String.valueOf(dateTimeL.getMinuteOfHour())
 );
 }

 // I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
 return SimpleVersionedStringSerializer.INSTANCE;
 }
}

Can someone explain how bucketAssigned is supposed to do in plainer 
english. I don't think the docs are clear and I'm lost.





Re: BucketAssigner - Confusion

2019-04-02 Thread Jeff Crane
 According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) 
signature requiring a return of string (Flink 1.7 libs), so I still don't think 
the BucketID is a variable type.

I still don't understand the role of the:public 
SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}Where does that come into play, if the getBucketID makes a string anyway?



On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane 
 wrote:  
 
 I have had an issue understanding the documentation, in regard to 
BucketAssigner.BucketID getBucketId(IN element,
 BucketAssigner.Context 
context)SimpleVersionedSerializer getSerializer()First of all, I 
don't understand what type of "BucketID" means. I assume that's the returned 
type fo the getBucketID, which doesn't make sense. The description says 
getBucketId (returns?)  "A string representing the identifier of the bucket" So 
BucketID is not a type, it's always a string?Base on the docs, I implemented 
like this, which doesn't write anything!public final class CustomBucketAssigner 
implements BucketAssigner {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}}
Can someone explain how bucketAssigned is supposed to do in plainer english. I 
don't think the docs are clear and I'm lost.  

BucketAssigner - Confusion

2019-04-01 Thread Jeff Crane
I have had an issue understanding the documentation, in regard to 
BucketAssigner.BucketID getBucketId(IN element,
 BucketAssigner.Context 
context)SimpleVersionedSerializer getSerializer()First of all, I 
don't understand what type of "BucketID" means. I assume that's the returned 
type fo the getBucketID, which doesn't make sense. The description says 
getBucketId (returns?)  "A string representing the identifier of the bucket" So 
BucketID is not a type, it's always a string?Base on the docs, I implemented 
like this, which doesn't write anything!public final class CustomBucketAssigner 
implements BucketAssigner {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}}
Can someone explain how bucketAssigned is supposed to do in plainer english. I 
don't think the docs are clear and I'm lost.