#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_RECORD_SCO (25 * PA_USEC_PER_MSEC)
@@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
return ret;
}
+static void update_buffer_size(struct userdata *u) {
+ int old_bufsize;
+ socklen_t len = sizeof(int);
+ int ret;
+
+ ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
+ if (ret == -1) {
+ pa_log_warn("Changing bluetooth buffer size: Failed to
getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
+ } else {
+ int new_bufsize;
+
+ /* Set send buffer size as small as possible. The minimum value is
1024 according to the
+ * socket man page. The data is written to the socket in chunks of
write_block_size, so
+ * there should at least be room for two chunks in the buffer.
Generally, write_block_size
+ * is larger than 512. If not, use the next multiple of
write_block_size which is larger
+ * than 1024. */
+ new_bufsize = 2 * u->write_block_size;
+ if (new_bufsize < 1024)
+ new_bufsize = (1024 / u->write_block_size + 1) *
u->write_block_size;
+
+ /* The kernel internally doubles the buffer size that was set by
setsockopt and getsockopt
+ * returns the doubled value. */
+ if (new_bufsize != old_bufsize / 2) {
+ ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF,
&new_bufsize, len);
+ if (ret == -1)
+ pa_log_warn("Changing bluetooth buffer size: Failed to change from
%d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
+ else
+ pa_log_info("Changing bluetooth buffer size: Changed from %d to
%d", old_bufsize / 2, new_bufsize);
+ }
+ }
+}
+
/* Run from I/O thread */
static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
struct sbc_info *sbc_info;
@@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t
bitpool) {
pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
pa_sink_set_fixed_latency_within_thread(u->sink,
FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size,
&u->sample_spec));
+
+ /* If there is still data in the memchunk, we have to discard it
+ * because the write_block_size may have changed. */
+ if (u->write_memchunk.memblock) {
+ pa_memblock_unref(u->write_memchunk.memblock);
+ pa_memchunk_reset(&u->write_memchunk);
+ }
+
+ update_buffer_size(u);
}
/* Run from I/O thread */
@@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
pa_log_debug("Stream properly set up, we're ready to roll!");
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
+ update_buffer_size(u);
+ }
u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
@@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code,
void *data, int64_t offse
switch (code) {
case PA_SINK_MESSAGE_GET_LATENCY: {
- int64_t wi, ri;
+ int64_t wi = 0, ri = 0;
if (u->read_smoother) {
ri = pa_smoother_get(u->read_smoother, pa_rtclock_now());
wi = pa_bytes_to_usec(u->write_index + u->write_block_size,
&u->sample_spec);
- } else {
+ } else if (u->started_at) {
ri = pa_rtclock_now() - u->started_at;
wi = pa_bytes_to_usec(u->write_index, &u->sample_spec);
}
@@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) {
return r;
}
+static int write_block(struct userdata *u) {
+ int n_written;
+
+ if (u->write_index <= 0)
+ u->started_at = pa_rtclock_now();
+
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
+ if ((n_written = a2dp_process_render(u)) < 0)
+ return -1;
+ } else {
+ if ((n_written = sco_process_render(u)) < 0)
+ return -1;
+ }
+
+ if (n_written == 0)
+ pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a
temporary connection loss.");
+
+ return n_written;
+}
+
+
/* I/O thread function */
static void thread_func(void *userdata) {
struct userdata *u = userdata;
- unsigned do_write = 0;
- unsigned pending_read_bytes = 0;
- bool writable = false;
+ unsigned blocks_to_write = 0;
+ unsigned bytes_to_write = 0;
pa_assert(u);
pa_assert(u->transport);
@@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) {
struct pollfd *pollfd;
int ret;
bool disable_timer = true;
+ bool writable = false;
+ bool have_source = u->source ?
PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
+ bool have_sink = u->sink ?
PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
+ /* Check for stream error or close */
if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
pa_log_info("FD error: %s%s%s%s",
pollfd->revents & POLLERR ? "POLLERR " :"",
@@ -1453,147 +1519,174 @@ static void thread_func(void *userdata) {
if (pollfd->revents & POLLHUP) {
pollfd = NULL;
teardown_stream(u);
- do_write = 0;
- pending_read_bytes = 0;
- writable = false;
+ blocks_to_write = 0;
+ bytes_to_write = 0;
pa_asyncmsgq_post(pa_thread_mq_get()->outq,
PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
} else
goto fail;
}
- if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
+ /* If there is a pollfd, the stream is set up and we need to do
something */
+ if (pollfd) {
- /* We should send two blocks to the device before we expect
- * a response. */
+ /* Handle source if present */
+ if (have_source) {
- if (u->write_index == 0 && u->read_index <= 0)
- do_write = 2;
+ /* We should send two blocks to the device before we expect a
response. */
+ if (u->write_index == 0 && u->read_index <= 0)
+ blocks_to_write = 2;
- if (pollfd && (pollfd->revents & POLLIN)) {
- int n_read;
+ /* If we got woken up by POLLIN let's do some reading */
+ if (pollfd->revents & POLLIN) {
+ int n_read;
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
- n_read = a2dp_process_push(u);
- else
- n_read = sco_process_push(u);
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
+ n_read = a2dp_process_push(u);
+ else
+ n_read = sco_process_push(u);
- if (n_read < 0)
- goto fail;
+ if (n_read < 0)
+ goto fail;
- if (n_read > 0) {
- /* We just read something, so we are supposed to write
something, too */
- pending_read_bytes += n_read;
- do_write += pending_read_bytes / u->write_block_size;
- pending_read_bytes = pending_read_bytes %
u->write_block_size;
+ if (n_read > 0) {
+ /* We just read something, so we are supposed to write
something, too */
+ bytes_to_write += n_read;
+ blocks_to_write += bytes_to_write /
u->write_block_size;
+ bytes_to_write = bytes_to_write % u->write_block_size;
+ }
}
}
- }
- if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
+ /* Handle sink if present */
+ if (have_sink) {
- if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
- pa_sink_process_rewind(u->sink, 0);
+ /* Process rewinds */
+ if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ pa_sink_process_rewind(u->sink, 0);
- if (pollfd) {
+ /* Test if the stream is writable */
if (pollfd->revents & POLLOUT)
writable = true;
- if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
- pa_usec_t time_passed;
- pa_usec_t audio_sent;
+ /* If we have a source, we let the source determine the timing
+ * for the sink */
+ if (have_source) {
- /* Hmm, there is no input stream we could synchronize
- * to. So let's do things by time */
+ if (writable && blocks_to_write > 0) {
+ int result;
- time_passed = pa_rtclock_now() - u->started_at;
- audio_sent = pa_bytes_to_usec(u->write_index,
&u->sample_spec);
+ if ((result = write_block(u)) < 0)
+ goto fail;
+ blocks_to_write -= result;
+ if (blocks_to_write > 0)
+ writable = false;