Re: Access to datastream from BucketSink- RESOLVED

2017-08-16 Thread ant burton
I have resolved my issue, thank you for your help.

The following code give me access to an element to determine a bucket directory 
name.

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;

import org.apache.flink.streaming.connectors.fs.Clock;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.flink.api.java.tuple.Tuple2;

public class S3Bucketer implements Bucketer {
private static final long serialVersionUID = 1L;

@Override
public Path getBucketPath(Clock clock, Path basePath, String element) {
// Now that we have access to element, we can
// generate a s3 filename path from it
String s3_filename_path = "";

return new Path(s3_filename_path);
}
}


> On 16 Aug 2017, at 16:06, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>>  says "You can also specify a custom bucketer by using setBucketer() on a 
>> BucketingSink. If desired, the bucketer can use a property of the element or 
>> tuple to determine the bucket directory.”
>> 
>> BucketingSink sink = new BucketingSink("/base/path");
>> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>>  private static final long serialVersionUID = 1L;
>> 
>>  private final String formatString;
>> 
>>  public S3Bucketer() {
>>  }
>> 
>>  private void readObject(ObjectInputStream in) {
>>  in.defaultReadObject();
>>  }
>> 
>>  public boolean shouldStartNewBucket(Path basePath, Path 
>> currentBucketPath) {
>>  return true;
>>  }
>> 
>>  public Path getNextBucketPath(Path basePath) {
>>  return new Path(basePath + 
>> “/some-path-that-I-need-create-from-the-stream");
>>  }
>> }
>> 
>> my question now is how do I access the data stream from within the 
>> S3Bucketer so that I can generate a filename based on the data with the data 
>> stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas  
>>> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
 On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
 
 or rather 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 
 
> On 16 Aug 2017, at 12:24, Kostas Kloudas  
> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>  // Set StreamExecutionEnvironment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>  // Set checkpoints in ms
>>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>  // Add source (input stream)
>>  DataStream dataStream = StreamUtil.getDataStream(env, 
>> params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 
 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
Thank you for your help it’s greatly appreciated.

My aim is to be able “ use a property of the element to determine the bucket 
directory”

With your suggestions, this is what I have so far, its obviously wrong, I hope 
I’m getting closer.

Is it correct to still implement Bucketer, just change where it is imported 
from? or do I need to import BucketingSink ?

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; // I think 
this is wrong
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.connectors.fs.Clock;

public class S3Bucketer implements Bucketer {
public Path getBucketPath(Clock clock, Path basePath, String element) {
// Now that we have access to element, we can
// generate a s3 filename path from it
String s3_filename_path = "";

return new Path(s3_filename_path);
}
}

Apologies my Java is limited at the present.

Thanks,

> On 16 Aug 2017, at 16:06, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>>  says "You can also specify a custom bucketer by using setBucketer() on a 
>> BucketingSink. If desired, the bucketer can use a property of the element or 
>> tuple to determine the bucket directory.”
>> 
>> BucketingSink sink = new BucketingSink("/base/path");
>> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>>  private static final long serialVersionUID = 1L;
>> 
>>  private final String formatString;
>> 
>>  public S3Bucketer() {
>>  }
>> 
>>  private void readObject(ObjectInputStream in) {
>>  in.defaultReadObject();
>>  }
>> 
>>  public boolean shouldStartNewBucket(Path basePath, Path 
>> currentBucketPath) {
>>  return true;
>>  }
>> 
>>  public Path getNextBucketPath(Path basePath) {
>>  return new Path(basePath + 
>> “/some-path-that-I-need-create-from-the-stream");
>>  }
>> }
>> 
>> my question now is how do I access the data stream from within the 
>> S3Bucketer so that I can generate a filename based on the data with the data 
>> stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas  
>>> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
 On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
 
 or rather 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 
 
> On 16 Aug 2017, at 12:24, Kostas Kloudas  
> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>  // Set StreamExecutionEnvironment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>  // Set checkpoints in ms
>>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>  // Add source (input stream)
>>  DataStream dataStream = StreamUtil.getDataStream(env, 
>> params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 
 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you are implementing the wrong Bucketer. 
This seems to be the one for the RollingSink which is deprecated. 
Is this correct?

You should implement the BucketingSink one, which is in the package:

org.apache.flink.streaming.connectors.fs.bucketing

That one requires the implementation of 1 method with signature:

Path getBucketPath(Clock clock, Path basePath, T element);

which from what I understand from you requirements gives you access 
to the element that you need.

Cheers,
Kostas

> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
> 
> 
> Thanks Kostas,
> 
> I’m narrowing in on a solution:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>  says "You can also specify a custom bucketer by using setBucketer() on a 
> BucketingSink. If desired, the bucketer can use a property of the element or 
> tuple to determine the bucket directory.”
> 
> BucketingSink sink = new BucketingSink("/base/path");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> Therefore I’ve created a skeleton class:
> 
> public class S3Bucketer implements Bucketer {
>   private static final long serialVersionUID = 1L;
> 
>   private final String formatString;
> 
>   public S3Bucketer() {
>   }
> 
>   private void readObject(ObjectInputStream in) {
>   in.defaultReadObject();
>   }
> 
>   public boolean shouldStartNewBucket(Path basePath, Path 
> currentBucketPath) {
>   return true;
>   }
> 
>   public Path getNextBucketPath(Path basePath) {
>   return new Path(basePath + 
> “/some-path-that-I-need-create-from-the-stream");
>   }
> }
> 
> my question now is how do I access the data stream from within the S3Bucketer 
> so that I can generate a filename based on the data with the data stream.
> 
> Thanks,
> 
>> On 16 Aug 2017, at 12:55, Kostas Kloudas  wrote:
>> 
>> In the second link for the BucketingSink, you can set your 
>> own Bucketer using the setBucketer method. You do not have to 
>> implement your own sink from scratch.
>> 
>> Kostas
>> 
>>> On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
>>> 
>>> or rather 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>> 
>>> 
 On 16 Aug 2017, at 12:24, Kostas Kloudas  
 wrote:
 
 Hi Ant,
 
 I think you can do it by implementing your own Bucketer.
 
 Cheers,
 Kostas
 
 .
> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
> 
> Hello,
> 
> Given 
> 
>   // Set StreamExecutionEnvironment
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>   // Set checkpoints in ms
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>   // Add source (input stream)
>   DataStream dataStream = StreamUtil.getDataStream(env, 
> params);
> 
> How can I construct the s3_filename from the content of the an event, it 
> seems that whenever I attempt this I either have access to an event or 
> access to .addSink but not both.
> 
>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
> s3_filename));
> 
> 
> Thanks,
> 
> 
> 
> 
 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton

Thanks Kostas,

I’m narrowing in on a solution:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
 

 says "You can also specify a custom bucketer by using setBucketer() on a 
BucketingSink. If desired, the bucketer can use a property of the element or 
tuple to determine the bucket directory.”

BucketingSink sink = new BucketingSink("/base/path");
sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
Therefore I’ve created a skeleton class:

public class S3Bucketer implements Bucketer {
private static final long serialVersionUID = 1L;

private final String formatString;

public S3Bucketer() {
}

private void readObject(ObjectInputStream in) {
in.defaultReadObject();
}

public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
return true;
}

public Path getNextBucketPath(Path basePath) {
return new Path(basePath + 
“/some-path-that-I-need-create-from-the-stream");
}
}

my question now is how do I access the data stream from within the S3Bucketer 
so that I can generate a filename based on the data with the data stream.

Thanks,

> On 16 Aug 2017, at 12:55, Kostas Kloudas  wrote:
> 
> In the second link for the BucketingSink, you can set your 
> own Bucketer using the setBucketer method. You do not have to 
> implement your own sink from scratch.
> 
> Kostas
> 
>> On Aug 16, 2017, at 1:39 PM, ant burton > > wrote:
>> 
>> or rather 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>  
>> 
>> 
>> 
>>> On 16 Aug 2017, at 12:24, Kostas Kloudas >> > wrote:
>>> 
>>> Hi Ant,
>>> 
>>> I think you can do it by implementing your own Bucketer.
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>> .
 On Aug 16, 2017, at 1:09 PM, ant burton > wrote:
 
 Hello,
 
 Given 
 
   // Set StreamExecutionEnvironment
   final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 
   // Set checkpoints in ms
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
   // Add source (input stream)
   DataStream dataStream = StreamUtil.getDataStream(env, 
 params);
 
 How can I construct the s3_filename from the content of the an event, it 
 seems that whenever I attempt this I either have access to an event or 
 access to .addSink but not both.
 
dataStream.addSink(new BucketingSink("s3a://flink/ 
 " + s3_filename));
 
 
 Thanks,
 
 
 
 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
In the second link for the BucketingSink, you can set your 
own Bucketer using the setBucketer method. You do not have to 
implement your own sink from scratch.

Kostas

> On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
> 
> or rather 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>  
> 
> 
> 
>> On 16 Aug 2017, at 12:24, Kostas Kloudas > > wrote:
>> 
>> Hi Ant,
>> 
>> I think you can do it by implementing your own Bucketer.
>> 
>> Cheers,
>> Kostas
>> 
>> .
>>> On Aug 16, 2017, at 1:09 PM, ant burton >> > wrote:
>>> 
>>> Hello,
>>> 
>>> Given 
>>> 
>>>   // Set StreamExecutionEnvironment
>>>   final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>   // Set checkpoints in ms
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>   // Add source (input stream)
>>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>>> 
>>> How can I construct the s3_filename from the content of the an event, it 
>>> seems that whenever I attempt this I either have access to an event or 
>>> access to .addSink but not both.
>>> 
>>> dataStream.addSink(new BucketingSink("s3a://flink/ 
>>> " + s3_filename));
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
or rather 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 



> On 16 Aug 2017, at 12:24, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>   // Set StreamExecutionEnvironment
>>   final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>   // Set checkpoints in ms
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>   // Add source (input stream)
>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton

I am I on the right path with the following:

class S3SinkFunc implements SinkFunction {
public void invoke(String element) {
System.out.println(element);
// don't have access to dataStream to call .addSink() :-(
}
}

Thanks,

> On 16 Aug 2017, at 12:24, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>   // Set StreamExecutionEnvironment
>>   final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>   // Set checkpoints in ms
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>   // Add source (input stream)
>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you can do it by implementing your own Bucketer.

Cheers,
Kostas

.
> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
> 
> Hello,
> 
> Given 
> 
>// Set StreamExecutionEnvironment
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>// Set checkpoints in ms
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>// Add source (input stream)
>DataStream dataStream = StreamUtil.getDataStream(env, params);
> 
> How can I construct the s3_filename from the content of the an event, it 
> seems that whenever I attempt this I either have access to an event or access 
> to .addSink but not both.
> 
>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
> s3_filename));
> 
> 
> Thanks,
> 
> 
> 
> 



Access to datastream from BucketSink

2017-08-16 Thread ant burton
Hello,

Given 

// Set StreamExecutionEnvironment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// Set checkpoints in ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Add source (input stream)
DataStream dataStream = StreamUtil.getDataStream(env, params);

How can I construct the s3_filename from the content of the an event, it seems 
that whenever I attempt this I either have access to an event or access to 
.addSink but not both.

dataStream.addSink(new BucketingSink("s3a://flink/" + 
s3_filename));


Thanks,