bq. is the mutator thread safe? See HBASE-17361
On Wed, May 3, 2017 at 1:52 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Niels, > With any kind of buffering you need to be careful when it comes to fault > tolerance. In your case, you should make sure to flush the buffers when > checkpointing, otherwise you might lose data because those elements will > not be resend after a failure. > > With the periodic timer my only concern would be concurrency issues, i.e. > is the mutator thread safe? > > Best, > Aljoscha > > On 30. Apr 2017, at 09:24, Kamil Dziublinski <kamil.dziublin...@gmail.com> > wrote: > > Hi Niels, > > This sounds to me like a great use case for using window functions. You > could partition your data (use keyby) based on website and then hold your > window for certain amount of time. After that you could give your sink > already batched object and store it directly. On top of that if you are > worried that data might become too big in fixed window time you could use a > trigger that fires both based on time and size. Although imo its no problem > to have bigger put for hbase. But you need to test. > I have very similar use case with kafka and hbase and I solved it like > that. > Hope that helps. > On Sat, 29 Apr 2017 at 18:05, Niels Basjes <ni...@basjes.nl> wrote: > >> Thanks. >> >> The specific table I have here is used to debugging purposes so at the >> HBase level I set a TTL of the data of 12 hours. >> So I'm not worrying about the Hfiles. >> Doing a lot of 'small' calls has an impact on HBase as a whole (not just >> this table) so I want buffering. >> Having a buffer that can hold 1000 events and at times I create 10 events >> with a single page and I'm the only on on the site (at that moment) the >> events will be buffered for a much too long time. >> >> I did a quick test and this seems to work for my case. >> In what situations do you guys expect this code construct to fail? Any >> edge cases I missed? >> >> Niels >> >> private transient BufferedMutator mutator = null; >> private transient Timer timer = null; >> >> @Override >> public void open(Configuration parameters) throws Exception { >> org.apache.hadoop.conf.Configuration hbaseConfig = >> HBaseConfiguration.create(); >> Connection connection = ConnectionFactory.createConnection(hbaseConfig); >> >> mutator = connection.getBufferedMutator( >> new BufferedMutatorParams(TableName.valueOf(tableName)) >> .pool(getDefaultExecutor(hbaseConfig)) >> .writeBufferSize(HBASE_BUFFER_SIZE) >> ); >> >> timer = new Timer(); >> timer.schedule(new TimerTask(){ >> @Override >> public void run() { >> try { >> MySink.this.mutator.flush(); >> } catch (Exception e) { >> // Ignore >> } >> }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL); >> } >> >> @Override >> public void close() throws IOException { >> timer.cancel(); >> mutator.close(); >> } >> >> >> >> >> >> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> I expect Flink expert to answer your question. >>> >>> bq. I get a flush of the buffers atleast every few seconds >>> >>> From hbase point of view, during low traffic period, the above may >>> result in many small hfiles, leading to more work for the compaction. >>> >>> FYI >>> >>> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> Hi, >>>> >>>> I have a sink that writes my records into HBase. >>>> >>>> The data stream is attached to measurements from an internal testing >>>> instance of the website. >>>> As a consequence there are periods of really high load (someone is >>>> doing a load test) and really low load (only a hand full of people are >>>> testing stuff). >>>> >>>> I read the records from Kafka and I want to write the records into >>>> HBase. >>>> Because under high load it is more efficient to buffer the writes >>>> between the client and the server and as indicated by HBase I use a >>>> BufferedMutator. >>>> >>>> This BufferedMutator works with a 'fixed size' buffer and under high >>>> load setting it to a few MiB improves the performance writing to HBase >>>> greatly. >>>> However under low load you have to wait until the buffer is full and >>>> that can be a LONG time (hours) when the load is really low. >>>> >>>> I want to fire a periodic event into my sink to ensure I get a flush of >>>> the buffers atleast every few seconds. >>>> >>>> Simply implement a standard Java TimerTask and fire that using a Timer? >>>> Or is there a better way of doing that in Flink? >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > >