Modified: trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp (281439 => 281440)
--- trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2021-08-23 07:11:41 UTC (rev 281439)
+++ trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2021-08-23 13:22:29 UTC (rev 281440)
@@ -20,6 +20,8 @@
#include "config.h"
#include "AppendPipeline.h"
+#include "AbortableTaskQueue.h"
+#include "MediaSourcePrivateGStreamer.h"
#if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
@@ -102,9 +104,7 @@
AppendPipeline::AppendPipeline(SourceBufferPrivateGStreamer& sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
: m_sourceBufferPrivate(sourceBufferPrivate)
, m_playerPrivate(&playerPrivate)
- , m_id(0)
, m_wasBusAlreadyNotifiedOfAvailableSamples(false)
- , m_streamType(Unknown)
{
ASSERT(isMainThread());
std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
@@ -150,80 +150,23 @@
else
ASSERT_NOT_REACHED();
- m_appsink = makeGStreamerElement("appsink", nullptr);
-
- gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
- gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
- gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); // No prerolls, no async state changes.
- gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE);
- gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
-
- GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
- g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
- if (isMainThread()) {
- // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification
- // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps.
-#ifndef NDEBUG
- GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink"));
- GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
- ASSERT(!caps);
-#endif
- return;
- }
-
- // The streaming thread has just received a new caps and is about to let samples using the
- // new caps flow. Let's block it until the main thread has consumed the samples with the old
- // caps and has processed the caps change.
- appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() {
- appendPipeline->appsinkCapsChanged();
- return AbortableTaskQueue::Void();
- });
- }), this);
-
#if !LOG_DISABLED
GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
- m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
- m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
- m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
#endif
-#if ENABLE(ENCRYPTED_MEDIA)
- m_appsinkPadEventProbeInformation.appendPipeline = this;
- m_appsinkPadEventProbeInformation.description = "appsink event probe";
- m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
-#endif
-
- // These signals won't be connected outside of the lifetime of "this".
- g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
- appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
- }), this);
- g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
- appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
- }), this);
+ // These signals won't outlive the lifetime of `this`.
g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
ASSERT(!isMainThread());
GST_DEBUG("Posting no-more-pads task to main thread");
- appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+ appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() {
appendPipeline->didReceiveInitializationSegment();
+ return AbortableTaskQueue::Void();
});
}), this);
- g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn {
- appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
- return GST_FLOW_OK;
- }), this);
- g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
- // Just ignore EOS when having more than one pad. It likely means that one of the pads is
- // going to be removed and the remaining one will be reattached.
- if (appendPipeline->m_errorReceived || appendPipeline->m_demux->numsrcpads > 1)
- return;
- GST_ERROR("AppendPipeline's appsink received EOS. This is usually caused by an invalid initialization segment.");
- appendPipeline->handleErrorConditionFromStreamingThread();
- }), this);
-
// Add_many will take ownership of a reference. That's why we used an assignment before.
gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
gst_element_link(m_appsrc.get(), m_demux.get());
@@ -249,9 +192,6 @@
gst_bus_remove_signal_watch(m_bus.get());
}
- if (m_appsrc)
- g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
-
if (m_demux) {
#if !LOG_DISABLED
GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
@@ -261,17 +201,15 @@
g_signal_handlers_disconnect_by_data(m_demux.get(), this);
}
- if (m_appsink) {
- GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
+ for (std::unique_ptr<Track>& track : m_tracks) {
+ GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(track->appsink.get(), "sink"));
g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
- g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
-
+ g_signal_handlers_disconnect_by_data(track->appsink.get(), this);
#if !LOG_DISABLED
- gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
+ gst_pad_remove_probe(appsinkPad.get(), track->appsinkDataEnteringPadProbeInformation.probeId);
#endif
-
#if ENABLE(ENCRYPTED_MEDIA)
- gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
+ gst_pad_remove_probe(appsinkPad.get(), track->appsinkPadEventProbeInformation.probeId);
#endif
}
@@ -351,71 +289,39 @@
}
}
-gint AppendPipeline::id()
+std::tuple<GRefPtr<GstCaps>, AppendPipeline::StreamType, FloatSize> AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
{
ASSERT(isMainThread());
- if (m_id)
- return m_id;
+ GRefPtr<GstCaps> parsedCaps = demuxerSrcPadCaps;
+ StreamType streamType = StreamType::Unknown;
+ FloatSize presentationSize;
- static gint s_totalAudio = 0;
- static gint s_totalVideo = 0;
- static gint s_totalText = 0;
-
- switch (m_streamType) {
- case Audio:
- m_id = ++s_totalAudio;
- break;
- case Video:
- m_id = ++s_totalVideo;
- break;
- case Text:
- m_id = ++s_totalText;
- break;
- case Unknown:
- case Invalid:
- GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
- ASSERT_NOT_REACHED();
- break;
- }
-
- GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
-
- return m_id;
-}
-
-void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
-{
- ASSERT(isMainThread());
-
- m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
- m_streamType = MediaSourceStreamTypeGStreamer::Unknown;
-
- const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
+ const char* originalMediaType = capsMediaType(demuxerSrcPadCaps);
auto& gstRegistryScanner = GStreamerRegistryScannerMSE::singleton();
if (!gstRegistryScanner.isCodecSupported(GStreamerRegistryScanner::Configuration::Decoding, originalMediaType)) {
- m_presentationSize = FloatSize();
- m_streamType = MediaSourceStreamTypeGStreamer::Invalid;
- } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
- m_presentationSize = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get()).value_or(FloatSize());
- m_streamType = MediaSourceStreamTypeGStreamer::Video;
+ streamType = StreamType::Invalid;
+ } else if (doCapsHaveType(demuxerSrcPadCaps, GST_VIDEO_CAPS_TYPE_PREFIX)) {
+ presentationSize = getVideoResolutionFromCaps(demuxerSrcPadCaps).value_or(FloatSize());
+ streamType = StreamType::Video;
} else {
- m_presentationSize = FloatSize();
- if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
- m_streamType = MediaSourceStreamTypeGStreamer::Audio;
- else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
- m_streamType = MediaSourceStreamTypeGStreamer::Text;
+ if (doCapsHaveType(demuxerSrcPadCaps, GST_AUDIO_CAPS_TYPE_PREFIX))
+ streamType = StreamType::Audio;
+ else if (doCapsHaveType(demuxerSrcPadCaps, GST_TEXT_CAPS_TYPE_PREFIX))
+ streamType = StreamType::Text;
}
+
+ return { WTFMove(parsedCaps), streamType, WTFMove(presentationSize) };
}
-void AppendPipeline::appsinkCapsChanged()
+void AppendPipeline::appsinkCapsChanged(Track& track)
{
ASSERT(isMainThread());
// Consume any pending samples with the previous caps.
- consumeAppsinkAvailableSamples();
+ consumeAppsinksAvailableSamples();
- GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
+ GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(track.appsink.get(), "sink"));
GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
if (!caps)
@@ -424,8 +330,8 @@
// If this is not the first time we're parsing an initialization segment, fail if the track
// has a different codec or type (e.g. if we were previously demuxing an audio stream and
// someone appends a video stream).
- if (m_appsinkCaps && g_strcmp0(capsMediaType(caps.get()), capsMediaType(m_appsinkCaps.get()))) {
- GST_WARNING_OBJECT(m_pipeline.get(), "User appended track metadata with type '%s' for a SourceBuffer previously handling '%s'. Erroring out.", capsMediaType(caps.get()), capsMediaType(m_appsinkCaps.get()));
+ if (track.caps && g_strcmp0(capsMediaType(caps.get()), capsMediaType(track.caps.get()))) {
+ GST_WARNING_OBJECT(m_pipeline.get(), "Track received incompatible caps, received '%s' for a track previously handling '%s'. Erroring out.", capsMediaType(caps.get()), capsMediaType(track.caps.get()));
m_sourceBufferPrivate.appendParsingFailed();
return;
}
@@ -432,24 +338,22 @@
if (doCapsHaveType(caps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
if (auto size = getVideoResolutionFromCaps(caps.get()))
- m_presentationSize = *size;
+ track.presentationSize = *size;
}
- if (m_appsinkCaps != caps) {
- m_appsinkCaps = WTFMove(caps);
- m_playerPrivate->trackDetected(*this, m_track);
- }
+ if (track.caps != caps)
+ track.caps = WTFMove(caps);
}
void AppendPipeline::handleEndOfAppend()
{
ASSERT(isMainThread());
- consumeAppsinkAvailableSamples();
+ consumeAppsinksAvailableSamples();
GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete");
sourceBufferPrivate().didReceiveAllPendingSamples();
}
-void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
+void AppendPipeline::appsinkNewSample(const Track& track, GRefPtr<GstSample>&& sample)
{
ASSERT(isMainThread());
@@ -464,7 +368,7 @@
return;
}
- auto mediaSample = MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
+ auto mediaSample = MediaSampleGStreamer::create(WTFMove(sample), track.presentationSize, track.trackId);
GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
mediaSample->trackID().string().utf8().data(),
@@ -499,48 +403,98 @@
{
ASSERT(isMainThread());
+ bool isFirstInitializationSegment = !m_hasReceivedFirstInitializationSegment;
+
SourceBufferPrivateClient::InitializationSegment initializationSegment;
- GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
- initializationSegment.duration = m_initialDuration;
+ gint64 timeLength = 0;
+ if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
+ && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
+ initializationSegment.duration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
+ else
+ initializationSegment.duration = MediaTime::positiveInfiniteTime();
- switch (m_streamType) {
- case Audio: {
- SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
- info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
- info.description = GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
- initializationSegment.audioTracks.append(info);
- break;
+ if (isFirstInitializationSegment) {
+ // Create a Track object per pad.
+ int trackIndex = 0;
+ for (GstPad* pad : GstIteratorAdaptor<GstPad>(GUniquePtr<GstIterator>(gst_element_iterate_src_pads(m_demux.get())))) {
+ auto [createTrackResult, track] = tryCreateTrackFromPad(pad, trackIndex);
+ if (createTrackResult == CreateTrackResult::AppendParsingFailed) {
+ // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the
+ // AppendPipeline will be destroyed.
+ m_sourceBufferPrivate.appendParsingFailed();
+ return;
+ }
+ if (track)
+ linkPadWithTrack(pad, *track);
+ trackIndex++;
+ }
+ } else {
+ // Link pads to existing Track objects that don't have a linked pad yet.
+ unsigned countPads = 0;
+ for (GstPad* pad : GstIteratorAdaptor<GstPad>(GUniquePtr<GstIterator>(gst_element_iterate_src_pads(m_demux.get())))) {
+ countPads++;
+ Track* track = tryMatchPadToExistingTrack(pad);
+ if (!track) {
+ GST_WARNING_OBJECT(pipeline(), "Can't match pad to existing tracks in the AppendPipeline: %" GST_PTR_FORMAT, pad);
+ m_sourceBufferPrivate.appendParsingFailed();
+ return;
+ }
+ linkPadWithTrack(pad, *track);
+ }
+ if (countPads != m_tracks.size()) {
+ GST_WARNING_OBJECT(pipeline(), "Number of pads (%u) doesn't match number of tracks (%zu).", countPads, m_tracks.size());
+ m_sourceBufferPrivate.appendParsingFailed();
+ return;
+ }
}
- case Video: {
- SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
- info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
- info.description = GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
- initializationSegment.videoTracks.append(info);
- break;
+
+ for (std::unique_ptr<Track>& track : m_tracks) {
+ GST_DEBUG_OBJECT(pipeline(), "Adding track to initialization with segment type %s, id %s.", streamTypeToString(track->streamType), track->trackId.string().utf8().data());
+ switch (track->streamType) {
+ case Audio: {
+ ASSERT(track->webKitTrack);
+ SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
+ info.track = static_cast<AudioTrackPrivateGStreamer*>(track->webKitTrack.get());
+ info.description = GStreamerMediaDescription::create(track->caps.get());
+ initializationSegment.audioTracks.append(info);
+ break;
+ }
+ case Video: {
+ ASSERT(track->webKitTrack);
+ SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
+ info.track = static_cast<VideoTrackPrivateGStreamer*>(track->webKitTrack.get());
+ info.description = GStreamerMediaDescription::create(track->caps.get());
+ initializationSegment.videoTracks.append(info);
+ break;
+ }
+ default:
+ GST_ERROR("Unsupported stream type or codec");
+ break;
+ }
}
- default:
- GST_ERROR("Unsupported stream type or codec");
- break;
+
+ if (isFirstInitializationSegment) {
+ for (std::unique_ptr<Track>& track : m_tracks) {
+ if (track->streamType == StreamType::Video) {
+ GST_DEBUG_OBJECT(pipeline(), "Setting initial video size to that of track with id '%s', %gx%g.",
+ track->trackId.string().utf8().data(), static_cast<double>(track->presentationSize.width()), static_cast<double>(track->presentationSize.height()));
+ m_playerPrivate->setInitialVideoSize(track->presentationSize);
+ break;
+ }
+ }
}
+ m_hasReceivedFirstInitializationSegment = true;
+ GST_DEBUG("Notifying SourceBuffer of initialization segment.");
+ GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "append-pipeline-received-init-segment");
m_sourceBufferPrivate.didReceiveInitializationSegment(WTFMove(initializationSegment), []() { });
}
-AtomString AppendPipeline::trackId()
+void AppendPipeline::consumeAppsinksAvailableSamples()
{
ASSERT(isMainThread());
- if (!m_track)
- return AtomString();
-
- return m_track->id();
-}
-
-void AppendPipeline::consumeAppsinkAvailableSamples()
-{
- ASSERT(isMainThread());
-
GRefPtr<GstSample> sample;
int batchedSampleCount = 0;
// In some cases each frame increases the duration of the movie.
@@ -547,9 +501,11 @@
// Batch duration changes so that if we pick 100 of such samples we don't have to run 100 times
// layout for the video controls, but only once.
m_playerPrivate->blockDurationChanges();
- while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
- appsinkNewSample(WTFMove(sample));
- batchedSampleCount++;
+ for (std::unique_ptr<Track>& track : m_tracks) {
+ while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(track->appsink.get()), 0)))) {
+ appsinkNewSample(*track, WTFMove(sample));
+ batchedSampleCount++;
+ }
}
m_playerPrivate->unblockDurationChanges();
@@ -573,9 +529,6 @@
// Reset the state of all elements in the pipeline.
assertedElementSetState(m_pipeline.get(), GST_STATE_READY);
- // The parser is tear down automatically when the demuxer is reset (see disconnectDemuxerSrcPadFromAppsinkFromAnyThread()).
- ASSERT(!m_parser);
-
// Set the pipeline to PLAYING so that it can be used again.
assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
@@ -642,20 +595,18 @@
GST_TRACE("Posting appsink-new-sample task to the main thread");
m_taskQueue.enqueueTask([this]() {
m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
- consumeAppsinkAvailableSamples();
+ consumeAppsinksAvailableSamples();
});
}
}
static GRefPtr<GstElement>
-createOptionalParserForFormat(GstPad* demuxerSrcPad)
+createOptionalParserForFormat(const AtomString& trackId, const GstCaps* caps)
{
- GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
- GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
+ GstStructure* structure = gst_caps_get_structure(caps, 0);
const char* mediaType = gst_structure_get_name(structure);
- GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
- GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
+ GUniquePtr<char> parserName(g_strdup_printf("%s_parser", trackId.string().utf8().data()));
if (!g_strcmp0(mediaType, "audio/x-opus")) {
GstElement* opusparse = makeGStreamerElement("opusparse", parserName.get());
@@ -669,220 +620,230 @@
return nullptr;
}
-void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
+AtomString AppendPipeline::generateTrackId(StreamType streamType, int padIndex)
{
- ASSERT(!isMainThread());
-
- GST_DEBUG("connecting to appsink");
-
- if (m_demux->numsrcpads > 1) {
- GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
- gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
- g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
- return;
+ switch (streamType) {
+ case Audio:
+ return makeString("A", padIndex);
+ case Video:
+ return makeString("V", padIndex);
+ case Text:
+ return makeString("T", padIndex);
+ default:
+ return makeString("O", padIndex);
}
-
- GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
-
- // Only one stream per demuxer is supported.
- ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
-
- gint64 timeLength = 0;
- if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
- && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
- m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
- else
- m_initialDuration = MediaTime::positiveInfiniteTime();
-
- GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
- auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
- connectDemuxerSrcPadToAppsink(demuxerSrcPad);
- return AbortableTaskQueue::Void();
- });
- if (!response) {
- // The AppendPipeline has been destroyed or aborted before we received a response.
- return;
- }
-
- // Must be done in the thread we were called from (usually streaming thread).
- bool isData = (m_streamType == MediaSourceStreamTypeGStreamer::Audio)
- || (m_streamType == MediaSourceStreamTypeGStreamer::Video)
- || (m_streamType == MediaSourceStreamTypeGStreamer::Text);
-
- if (isData) {
- GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
- if (!parent)
- gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
-
- // Current head of the pipeline being built.
- GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
-
- // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
- // the contained audio streams in order to know the duration of the frames.
- // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
- m_parser = createOptionalParserForFormat(currentSrcPad.get());
- if (m_parser) {
- gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
- gst_element_sync_state_with_parent(m_parser.get());
-
- GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
- GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
-
- gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
- currentSrcPad = parserSrcPad;
- }
-
- gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
-
- gst_element_sync_state_with_parent(m_appsink.get());
-
- GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
- }
}
-void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
+std::pair<AppendPipeline::CreateTrackResult, AppendPipeline::Track*> AppendPipeline::tryCreateTrackFromPad(GstPad* demuxerSrcPad, int trackIndex)
{
ASSERT(isMainThread());
- GST_DEBUG("Connecting to appsink");
+ ASSERT(!m_hasReceivedFirstInitializationSegment);
+ GST_DEBUG_OBJECT(pipeline(), "Creating Track object for pad %" GST_PTR_FORMAT, demuxerSrcPad);
const String& type = m_sourceBufferPrivate.type().containerType();
if (type.endsWith("webm"))
gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
- GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
-
- // Only one stream per demuxer is supported.
- ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
-
- // As it is now, resetParserState() will cause the pads to be disconnected, so they will later be re-added on the next initialization segment.
-
- GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
-
+ auto [parsedCaps, streamType, presentationSize] = parseDemuxerSrcPadCaps(adoptGRef(gst_pad_get_current_caps(demuxerSrcPad)).get());
#ifndef GST_DISABLE_GST_DEBUG
{
- GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
+ GUniquePtr<gchar> strcaps(gst_caps_to_string(parsedCaps.get()));
GST_DEBUG("%s", strcaps.get());
}
#endif
- parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
+ if (streamType == StreamType::Invalid) {
+ GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, parsedCaps.get());
+ // 3.5.7 Initialization Segment Received
+ // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the
+ // append error algorithm and abort these steps.
+ return { CreateTrackResult::AppendParsingFailed, nullptr };
+ }
+ if (streamType == StreamType::Unknown) {
+ GST_WARNING_OBJECT(pipeline(), "Pad '%s' with parsed caps %" GST_PTR_FORMAT " has an unknown type, will be connected to a black hole probe.", GST_PAD_NAME(demuxerSrcPad), parsedCaps.get());
+ gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
+ return { CreateTrackResult::TrackIgnored, nullptr };
+ }
+ AtomString trackId = generateTrackId(streamType, trackIndex);
- TrackPrivateBaseGStreamer* gstreamerTrack;
- switch (m_streamType) {
- case MediaSourceStreamTypeGStreamer::Audio: {
- auto specificTrack = AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
+ GST_DEBUG_OBJECT(pipeline(), "Creating new AppendPipeline::Track with id '%s'", trackId.string().utf8().data());
+ size_t newTrackIndex = m_tracks.size();
+ m_tracks.append(WTF::makeUnique<Track>(trackId, streamType, parsedCaps, presentationSize));
+ Track& track = *m_tracks.at(newTrackIndex);
+ track.initializeElements(this, GST_BIN(m_pipeline.get()));
+ track.webKitTrack = makeWebKitTrack(newTrackIndex);
+ hookTrackEvents(track);
+ return { CreateTrackResult::TrackCreated, &track };
+}
+
+AppendPipeline::Track* AppendPipeline::tryMatchPadToExistingTrack(GstPad *demuxerSrcPad)
+{
+ ASSERT(isMainThread());
+ ASSERT(m_hasReceivedFirstInitializationSegment);
+ AtomString trackId = GST_PAD_NAME(demuxerSrcPad);
+ auto [parsedCaps, streamType, presentationSize] = parseDemuxerSrcPadCaps(adoptGRef(gst_pad_get_current_caps(demuxerSrcPad)).get());
+
+ // Try to find a matching pre-existing track. Ideally, tracks should be matched by track ID, but matching by type
+ // is provided as a fallback -- which will be used, since we don't have a way to fetch those from GStreamer at the moment.
+ Track* matchingTrack = nullptr;
+ for (std::unique_ptr<Track>& track : m_tracks) {
+ if (track->streamType != streamType || gst_pad_is_linked(track->entryPad.get()))
+ continue;
+ matchingTrack = &*track;
+ if (track->trackId == trackId)
+ break;
+ }
+
+ if (!matchingTrack) {
+ // Invalid configuration.
+ GST_WARNING_OBJECT(pipeline(), "Couldn't find a matching pre-existing track for pad '%s' with parsed caps %" GST_PTR_FORMAT
+ " on non-first initialization segment, will be connected to a black hole probe.", GST_PAD_NAME(demuxerSrcPad), parsedCaps.get());
+ gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
+ }
+ return matchingTrack;
+}
+
+void AppendPipeline::linkPadWithTrack(GstPad* demuxerSrcPad, Track& track)
+{
+ GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "append-pipeline-before-link");
+ ASSERT(!GST_PAD_IS_LINKED(track.entryPad.get()));
+ gst_pad_link(demuxerSrcPad, track.entryPad.get());
+ ASSERT(GST_PAD_IS_LINKED(track.entryPad.get()));
+ GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "append-pipeline-after-link");
+}
+
+Ref<WebCore::TrackPrivateBase> AppendPipeline::makeWebKitTrack(int trackIndex)
+{
+ Track& appendPipelineTrack = *m_tracks.at(trackIndex);
+
+ RefPtr<WebCore::TrackPrivateBase> track;
+ TrackPrivateBaseGStreamer* gstreamerTrack = nullptr;
+ // FIXME: AudioTrackPrivateGStreamer etc. should probably use pads of the playback pipeline rather than the append pipeline.
+ switch (appendPipelineTrack.streamType) {
+ case StreamType::Audio: {
+ auto specificTrack = AudioTrackPrivateGStreamer::create(makeWeakPtr(m_playerPrivate), trackIndex, appendPipelineTrack.appsinkPad);
gstreamerTrack = specificTrack.ptr();
- m_track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
+ track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
break;
}
- case MediaSourceStreamTypeGStreamer::Video: {
- auto specificTrack = VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
+ case StreamType::Video: {
+ auto specificTrack = VideoTrackPrivateGStreamer::create(makeWeakPtr(m_playerPrivate), trackIndex, appendPipelineTrack.appsinkPad);
gstreamerTrack = specificTrack.ptr();
- m_track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
+ track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
break;
}
- case MediaSourceStreamTypeGStreamer::Text: {
- auto specificTrack = InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
+ case StreamType::Text: {
+ auto specificTrack = InbandTextTrackPrivateGStreamer::create(trackIndex, appendPipelineTrack.appsinkPad);
gstreamerTrack = specificTrack.ptr();
- m_track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
+ track = makeRefPtr(static_cast<TrackPrivateBase*>(specificTrack.ptr()));
break;
}
- case MediaSourceStreamTypeGStreamer::Invalid:
- GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get());
- // 3.5.7 Initialization Segment Received
- // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the
- // append error algorithm and abort these steps.
-
- // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the
- // AppendPipeline will be destroyed.
- m_sourceBufferPrivate.appendParsingFailed();
- return;
default:
- GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get());
- return;
+ ASSERT_NOT_REACHED();
}
- gstreamerTrack->setInitialCaps(GRefPtr(caps));
-
- m_appsinkCaps = WTFMove(caps);
- m_playerPrivate->trackDetected(*this, m_track);
+ ASSERT(appendPipelineTrack.caps.get());
+ gstreamerTrack->setInitialCaps(appendPipelineTrack.caps.get());
+ return track.releaseNonNull();
}
-void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad* demuxerSrcPad)
+void AppendPipeline::Track::initializeElements(AppendPipeline* appendPipeline, GstBin* bin)
{
- // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
- // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
- // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
- GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
+ appsink = makeGStreamerElement("appsink", nullptr);
+ gst_app_sink_set_emit_signals(GST_APP_SINK(appsink.get()), TRUE);
+ gst_base_sink_set_sync(GST_BASE_SINK(appsink.get()), FALSE);
+ gst_base_sink_set_async_enabled(GST_BASE_SINK(appsink.get()), FALSE); // No prerolls, no async state changes.
+ gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(appsink.get()), FALSE);
+ gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(appsink.get()), FALSE);
- // Reconnect the other pad if it's the only remaining after removing this one and wasn't connected yet (has a black hole probe).
- if (m_demux->numsrcpads == 1) {
- auto remainingPad = GST_PAD(m_demux->srcpads->data);
+ gst_bin_add(GST_BIN(appendPipeline->pipeline()), appsink.get());
+ gst_element_sync_state_with_parent(appsink.get());
+ entryPad = appsinkPad = adoptGRef(gst_element_get_static_pad(appsink.get(), "sink"));
- auto probeId = GPOINTER_TO_ULONG(g_object_get_data(G_OBJECT(remainingPad), "blackHoleProbeId"));
- if (remainingPad && probeId) {
- auto oldPeerPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
- while (gst_pad_is_linked(oldPeerPad.get())) {
- // Get sink pad of the parser before appsink.
- // All the expected elements between the demuxer and appsink are supposed to have pads named "sink".
- oldPeerPad = adoptGRef(gst_pad_get_peer(oldPeerPad.get()));
- auto element = adoptGRef(gst_pad_get_parent_element(oldPeerPad.get()));
- oldPeerPad = adoptGRef(gst_element_get_static_pad(element.get(), "sink"));
- ASSERT(oldPeerPad);
- }
+#if !LOG_DISABLED
+ appsinkDataEnteringPadProbeInformation.appendPipeline = appendPipeline;
+ appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
+ appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &appsinkDataEnteringPadProbeInformation, nullptr);
+#endif
- gst_pad_remove_probe(remainingPad, probeId);
+#if ENABLE(ENCRYPTED_MEDIA)
+ appsinkPadEventProbeInformation.appendPipeline = appendPipeline;
+ appsinkPadEventProbeInformation.description = "appsink event probe";
+ appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &appsinkPadEventProbeInformation, nullptr);
+#endif
- auto oldPeerPadCaps = adoptGRef(gst_pad_get_current_caps(oldPeerPad.get()));
- auto remainingPadCaps = adoptGRef(gst_pad_get_current_caps(remainingPad));
- const char* oldPeerPadType = nullptr;
- const char* remainingPadType = nullptr;
+ // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
+ // the contained audio streams in order to know the duration of the frames.
+ // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
+ if ((parser = createOptionalParserForFormat(trackId, caps.get()))) {
+ gst_bin_add(bin, parser.get());
+ gst_element_sync_state_with_parent(parser.get());
+ gst_element_link(parser.get(), appsink.get());
+ ASSERT(GST_PAD_IS_LINKED(appsinkPad.get()));
+ entryPad = adoptGRef(gst_element_get_static_pad(parser.get(), "sink"));
+ }
+}
- if (oldPeerPadCaps) {
- auto oldPeerPadCapsStructure = gst_caps_get_structure(oldPeerPadCaps.get(), 0);
- if (oldPeerPadCapsStructure)
- oldPeerPadType = gst_structure_get_name(oldPeerPadCapsStructure);
- }
- if (remainingPadCaps) {
- auto remainingPadCapsStructure = gst_caps_get_structure(remainingPadCaps.get(), 0);
- if (remainingPadCapsStructure)
- remainingPadType = gst_structure_get_name(remainingPadCapsStructure);
- }
+void AppendPipeline::hookTrackEvents(Track& track)
+{
+ g_signal_connect(track.appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn {
+ appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
+ return GST_FLOW_OK;
+ }), this);
- if (g_strcmp0(oldPeerPadType, remainingPadType)) {
- GST_ERROR("The remaining pad has a blackHoleProbe, but can't reconnect as main pad because the caps types are incompatible: oldPeerPadCaps: %" GST_PTR_FORMAT ", remainingPadCaps: %" GST_PTR_FORMAT, oldPeerPadCaps.get(), remainingPadCaps.get());
- if (!isMainThread())
- handleErrorConditionFromStreamingThread();
- else
- m_sourceBufferPrivate.appendParsingFailed();
- return;
- }
+ struct Closure {
+ public:
- GST_DEBUG("The remaining pad has a blackHoleProbe, reconnecting as main pad. oldPad: %" GST_PTR_FORMAT ", newPad: %" GST_PTR_FORMAT ", peerPad: %" GST_PTR_FORMAT, demuxerSrcPad, remainingPad, oldPeerPad.get());
+ Closure(AppendPipeline& appendPipeline, Track& track)
+ : appendPipeline(appendPipeline)
+ , track(track)
+ { }
+ static void destruct(void* closure, GClosure*) { delete static_cast<Closure*>(closure); }
- gst_pad_link(remainingPad, oldPeerPad.get());
- if (m_parser)
- gst_element_set_state(m_parser.get(), GST_STATE_NULL);
- gst_element_set_state(m_appsink.get(), GST_STATE_NULL);
- gst_element_set_state(m_appsink.get(), GST_STATE_PLAYING);
- if (m_parser)
- gst_element_set_state(m_parser.get(), GST_STATE_PLAYING);
+ AppendPipeline& appendPipeline;
+ Track& track;
+ };
- GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-relink");
-
+ g_signal_connect_data(track.appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, Closure* closure) {
+ AppendPipeline& appendPipeline = closure->appendPipeline;
+ Track& track = closure->track;
+ if (isMainThread()) {
+ // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification
+ // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps.
+#ifndef NDEBUG
+ GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(track.appsink.get(), "sink"));
+ GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
+ ASSERT(!caps);
+#endif
return;
}
- }
- GST_DEBUG("Disconnecting appsink");
+ // The streaming thread has just received a new caps and is about to let samples using the
+ // new caps flow. Let's block it until the main thread has consumed the samples with the old
+ // caps and has processed the caps change.
+ appendPipeline.m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([&appendPipeline, &track]() {
+ appendPipeline.appsinkCapsChanged(track);
+ return AbortableTaskQueue::Void();
+ });
+ }), new Closure { *this, track }, Closure::destruct, static_cast<GConnectFlags>(0));
+}
- if (m_parser) {
- assertedElementSetState(m_parser.get(), GST_STATE_NULL);
- gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
- m_parser = nullptr;
+#ifndef GST_DISABLE_GST_DEBUG
+const char* AppendPipeline::streamTypeToString(StreamType streamType)
+{
+ switch (streamType) {
+ case StreamType::Audio:
+ return "Audio";
+ case StreamType::Video:
+ return "Video";
+ case StreamType::Text:
+ return "Text";
+ case StreamType::Invalid:
+ return "Invalid";
+ case StreamType::Unknown:
+ return "Unknown";
}
-
- GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
}
+#endif
#if !LOG_DISABLED
static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)