From b3ef959b25a25f4657fc761d0ca27dde50dbfec5 Mon Sep 17 00:00:00 2001 From: Vladislav Mikhalin Date: Thu, 15 Aug 2024 21:59:59 +0300 Subject: [PATCH] Fixed threading, migrated to CVs, added looping --- .gitmodules | 2 +- src/core/libraries/avplayer/avplayer.cpp | 7 +- src/core/libraries/avplayer/avplayer_impl.cpp | 7 ++ src/core/libraries/avplayer/avplayer_impl.h | 1 + .../libraries/avplayer/avplayer_source.cpp | 103 +++++++++++------- src/core/libraries/avplayer/avplayer_source.h | 47 +++++++- .../libraries/avplayer/avplayer_state.cpp | 40 +++++-- src/core/libraries/avplayer/avplayer_state.h | 3 + 8 files changed, 161 insertions(+), 49 deletions(-) diff --git a/.gitmodules b/.gitmodules index e96f33ec..60fb5fbb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -63,4 +63,4 @@ url = https://github.com/HowardHinnant/date.git [submodule "externals/ffmpeg-core"] path = externals/ffmpeg-core - url = https://github.com/RPCS3/ffmpeg-core.git + url = https://github.com/shadps4-emu/ext-ffmpeg-core diff --git a/src/core/libraries/avplayer/avplayer.cpp b/src/core/libraries/avplayer/avplayer.cpp index 8a1152a3..78e03d78 100644 --- a/src/core/libraries/avplayer/avplayer.cpp +++ b/src/core/libraries/avplayer/avplayer.cpp @@ -230,6 +230,9 @@ s32 PS4_SYSV_ABI sceAvPlayerSetLooping(SceAvPlayerHandle handle, bool loop_flag) if (handle == nullptr) { return ORBIS_AVPLAYER_ERROR_INVALID_PARAMS; } + if (!handle->SetLooping(loop_flag)) { + return ORBIS_AVPLAYER_ERROR_OPERATION_FAILED; + } return ORBIS_OK; } @@ -256,7 +259,9 @@ s32 PS4_SYSV_ABI sceAvPlayerStop(SceAvPlayerHandle handle) { if (handle == nullptr) { return ORBIS_AVPLAYER_ERROR_INVALID_PARAMS; } - return handle->Stop(); + const auto res = handle->Stop(); + LOG_TRACE(Lib_AvPlayer, "returning {}", res); + return res; } s32 PS4_SYSV_ABI sceAvPlayerStreamCount(SceAvPlayerHandle handle) { diff --git a/src/core/libraries/avplayer/avplayer_impl.cpp b/src/core/libraries/avplayer/avplayer_impl.cpp index 1114254f..cdfff827 100644 --- a/src/core/libraries/avplayer/avplayer_impl.cpp +++ b/src/core/libraries/avplayer/avplayer_impl.cpp @@ -190,4 +190,11 @@ s32 AvPlayer::Stop() { return ORBIS_OK; } +bool AvPlayer::SetLooping(bool is_looping) { + if (m_state == nullptr) { + return false; + } + return m_state->SetLooping(is_looping); +} + } // namespace Libraries::AvPlayer diff --git a/src/core/libraries/avplayer/avplayer_impl.h b/src/core/libraries/avplayer/avplayer_impl.h index df4acfee..09989d39 100644 --- a/src/core/libraries/avplayer/avplayer_impl.h +++ b/src/core/libraries/avplayer/avplayer_impl.h @@ -37,6 +37,7 @@ public: bool IsActive(); u64 CurrentTime(); s32 Stop(); + bool SetLooping(bool is_looping); private: using ScePthreadMutex = Kernel::ScePthreadMutex; diff --git a/src/core/libraries/avplayer/avplayer_source.cpp b/src/core/libraries/avplayer/avplayer_source.cpp index e235b2c2..964e0fbd 100644 --- a/src/core/libraries/avplayer/avplayer_source.cpp +++ b/src/core/libraries/avplayer/avplayer_source.cpp @@ -27,7 +27,8 @@ AvPlayerSource::AvPlayerSource(AvPlayerStateCallback& state, std::string_view pa const SceAvPlayerInitData& init_data, SceAvPlayerSourceType source_type) : m_state(state), m_memory_replacement(init_data.memory_replacement), - m_num_output_video_framebuffers(init_data.num_output_video_framebuffers) { + m_num_output_video_framebuffers( + std::min(std::max(2, init_data.num_output_video_framebuffers), 16)) { AVFormatContext* context = avformat_alloc_context(); if (init_data.file_replacement.open != nullptr) { m_up_data_streamer = @@ -208,7 +209,7 @@ void AvPlayerSource::SetLooping(bool is_looping) { } std::optional AvPlayerSource::HasFrames(u32 num_frames) { - return m_video_frames.Size() > num_frames || m_is_eof; + return m_video_packets.Size() > num_frames || m_is_eof; } s32 AvPlayerSource::Start() { @@ -255,6 +256,7 @@ bool AvPlayerSource::Stop() { m_video_buffers.Push(std::move(m_current_video_frame.value())); m_current_video_frame.reset(); } + m_stop_cv.Notify(); m_audio_packets.Clear(); m_video_packets.Clear(); @@ -291,30 +293,30 @@ bool AvPlayerSource::GetVideoData(SceAvPlayerFrameInfoEx& video_info) { return false; } - using namespace std::chrono; - while (m_video_frames.Size() == 0 && !m_is_eof) { - std::this_thread::sleep_for(milliseconds(5)); - } + m_video_frames_cv.Wait([this]() { return m_video_frames.Size() != 0 || m_is_eof; }); auto frame = m_video_frames.Pop(); if (!frame.has_value()) { - LOG_WARNING(Lib_AvPlayer, "Could get video frame: no frames."); + LOG_WARNING(Lib_AvPlayer, "Could get video frame. EOF reached."); return false; } { + using namespace std::chrono; auto elapsed_time = duration_cast(high_resolution_clock::now() - m_start_time).count(); - while (elapsed_time < frame->info.timestamp) { - std::this_thread::sleep_for(milliseconds(frame->info.timestamp - elapsed_time)); - elapsed_time = - duration_cast(high_resolution_clock::now() - m_start_time).count(); + if (elapsed_time < frame->info.timestamp) { + if (m_stop_cv.WaitFor(milliseconds(frame->info.timestamp - elapsed_time), + [&]() { return elapsed_time >= frame->info.timestamp; })) { + return false; + } } } // return the buffer to the queue if (m_current_video_frame.has_value()) { m_video_buffers.Push(std::move(m_current_video_frame.value())); + m_video_buffers_cv.Notify(); } m_current_video_frame = std::move(frame->buffer); video_info = frame->info; @@ -326,30 +328,30 @@ bool AvPlayerSource::GetAudioData(SceAvPlayerFrameInfo& audio_info) { return false; } - using namespace std::chrono; - while (m_audio_frames.Size() == 0 && !m_is_eof) { - std::this_thread::sleep_for(milliseconds(5)); - } + m_audio_frames_cv.Wait([this]() { return m_audio_frames.Size() != 0 || m_is_eof; }); auto frame = m_audio_frames.Pop(); if (!frame.has_value()) { - LOG_WARNING(Lib_AvPlayer, "Could get audio frame: no frames."); + LOG_WARNING(Lib_AvPlayer, "Could get audio frame. EOF reached."); return false; } { + using namespace std::chrono; auto elapsed_time = duration_cast(high_resolution_clock::now() - m_start_time).count(); - while (elapsed_time < frame->info.timestamp) { - std::this_thread::sleep_for(milliseconds(frame->info.timestamp - elapsed_time)); - elapsed_time = - duration_cast(high_resolution_clock::now() - m_start_time).count(); + if (elapsed_time < frame->info.timestamp) { + if (m_stop_cv.WaitFor(milliseconds(frame->info.timestamp - elapsed_time), + [&]() { return elapsed_time >= frame->info.timestamp; })) { + return false; + } } } // return the buffer to the queue if (m_current_audio_frame.has_value()) { m_audio_buffers.Push(std::move(m_current_audio_frame.value())); + m_audio_buffers_cv.Notify(); } m_current_audio_frame = std::move(frame->buffer); @@ -424,8 +426,26 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) { const auto res = av_read_frame(m_avformat_context.get(), up_packet.get()); if (res < 0) { if (res == AVERROR_EOF) { - LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer"); - break; + if (m_is_looping) { + LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer. Looping the source..."); + avio_seek(m_avformat_context->pb, 0, SEEK_SET); + if (m_video_stream_index.has_value()) { + const auto index = m_video_stream_index.value(); + const auto stream = m_avformat_context->streams[index]; + avformat_seek_file(m_avformat_context.get(), index, 0, 0, stream->duration, + 0); + } + if (m_audio_stream_index.has_value()) { + const auto index = m_audio_stream_index.value(); + const auto stream = m_avformat_context->streams[index]; + avformat_seek_file(m_avformat_context.get(), index, 0, 0, stream->duration, + 0); + } + continue; + } else { + LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer. Exiting."); + break; + } } else { LOG_ERROR(Lib_AvPlayer, "Could not read AV frame: error = {}", res); m_state.OnError(); @@ -435,14 +455,20 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) { } if (up_packet->stream_index == m_video_stream_index) { m_video_packets.Push(std::move(up_packet)); + m_video_packets_cv.Notify(); } else if (up_packet->stream_index == m_audio_stream_index) { m_audio_packets.Push(std::move(up_packet)); + m_audio_packets_cv.Notify(); } } m_is_eof = true; - void* res; + m_video_packets_cv.Notify(); + m_audio_packets_cv.Notify(); + m_video_frames_cv.Notify(); + m_audio_frames_cv.Notify(); + if (m_video_decoder_thread.joinable()) { m_video_decoder_thread.join(); } @@ -457,7 +483,7 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) { AvPlayerSource::AVFramePtr AvPlayerSource::ConvertVideoFrame(const AVFrame& frame) { auto nv12_frame = AVFramePtr{av_frame_alloc(), &ReleaseAVFrame}; nv12_frame->pts = frame.pts; - nv12_frame->pkt_dts = frame.pkt_dts; + nv12_frame->pkt_dts = frame.pkt_dts < 0 ? 0 : frame.pkt_dts; nv12_frame->format = AV_PIX_FMT_NV12; nv12_frame->width = frame.width; nv12_frame->height = frame.height; @@ -520,8 +546,8 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) { using namespace std::chrono; LOG_INFO(Lib_AvPlayer, "Video Decoder Thread started"); while ((!m_is_eof || m_video_packets.Size() != 0) && !stop.stop_requested()) { - if (m_video_packets.Size() == 0) { - std::this_thread::sleep_for(milliseconds(5)); + if (!m_video_packets_cv.Wait( + stop, [this]() { return m_video_packets.Size() != 0 || m_is_eof; })) { continue; } const auto packet = m_video_packets.Pop(); @@ -537,8 +563,10 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) { return; } while (res >= 0) { - if (m_video_buffers.Size() == 0 && !stop.stop_requested()) { - std::this_thread::sleep_for(milliseconds(5)); + if (!m_video_buffers_cv.Wait(stop, [this]() { return m_video_buffers.Size() != 0; })) { + break; + } + if (m_video_buffers.Size() == 0) { continue; } auto up_frame = AVFramePtr(av_frame_alloc(), &ReleaseAVFrame); @@ -566,8 +594,7 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) { } else { m_video_frames.Push(PrepareVideoFrame(std::move(buffer.value()), *up_frame)); } - LOG_TRACE(Lib_AvPlayer, "Produced Video Frame. Num Frames: {}", - m_video_frames.Size()); + m_video_frames_cv.Notify(); } } } @@ -578,7 +605,7 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) { AvPlayerSource::AVFramePtr AvPlayerSource::ConvertAudioFrame(const AVFrame& frame) { auto pcm16_frame = AVFramePtr{av_frame_alloc(), &ReleaseAVFrame}; pcm16_frame->pts = frame.pts; - pcm16_frame->pkt_dts = frame.pkt_dts; + pcm16_frame->pkt_dts = frame.pkt_dts < 0 ? 0 : frame.pkt_dts; pcm16_frame->format = AV_SAMPLE_FMT_S16; pcm16_frame->ch_layout = frame.ch_layout; pcm16_frame->sample_rate = frame.sample_rate; @@ -638,8 +665,8 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) { using namespace std::chrono; LOG_INFO(Lib_AvPlayer, "Audio Decoder Thread started"); while ((!m_is_eof || m_audio_packets.Size() != 0) && !stop.stop_requested()) { - if (m_audio_packets.Size() == 0) { - std::this_thread::sleep_for(milliseconds(5)); + if (!m_audio_packets_cv.Wait( + stop, [this]() { return m_audio_packets.Size() != 0 || m_is_eof; })) { continue; } const auto packet = m_audio_packets.Pop(); @@ -654,10 +681,13 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) { return; } while (res >= 0) { - if (m_audio_buffers.Size() == 0 && !stop.stop_requested()) { - std::this_thread::sleep_for(milliseconds(5)); + if (!m_audio_buffers_cv.Wait(stop, [this]() { return m_audio_buffers.Size() != 0; })) { + break; + } + if (m_audio_buffers.Size() == 0) { continue; } + auto up_frame = AVFramePtr(av_frame_alloc(), &ReleaseAVFrame); res = avcodec_receive_frame(m_audio_codec_context.get(), up_frame.get()); if (res < 0) { @@ -683,8 +713,7 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) { } else { m_audio_frames.Push(PrepareAudioFrame(std::move(buffer.value()), *up_frame)); } - LOG_TRACE(Lib_AvPlayer, "Produced Audio Frame. Num Frames: {}", - m_audio_frames.Size()); + m_audio_frames_cv.Notify(); } } } diff --git a/src/core/libraries/avplayer/avplayer_source.h b/src/core/libraries/avplayer/avplayer_source.h index b3fbb30d..7144e7ee 100644 --- a/src/core/libraries/avplayer/avplayer_source.h +++ b/src/core/libraries/avplayer/avplayer_source.h @@ -7,12 +7,13 @@ #include "avplayer_common.h" #include "avplayer_data_streamer.h" -#include "common/types.h" #include "common/polyfill_thread.h" +#include "common/types.h" #include "core/libraries/kernel/thread_management.h" #include #include +#include #include #include #include @@ -87,6 +88,36 @@ struct Frame { SceAvPlayerFrameInfoEx info; }; +class EventCV { +public: + template + void Wait(Pred pred) { + std::unique_lock lock(m_mutex); + m_cv.wait(lock, std::move(pred)); + } + + template + bool Wait(std::stop_token stop, Pred pred) { + std::unique_lock lock(m_mutex); + return m_cv.wait(lock, std::move(stop), std::move(pred)); + } + + template + bool WaitFor(std::chrono::duration timeout, Pred pred) { + std::unique_lock lock(m_mutex); + return m_cv.wait_for(lock, timeout, std::move(pred)); + } + + void Notify() { + std::unique_lock lock(m_mutex); + m_cv.notify_all(); + } + +private: + std::mutex m_mutex{}; + std::condition_variable_any m_cv{}; +}; + class AvPlayerSource { public: AvPlayerSource(AvPlayerStateCallback& state, std::string_view path, @@ -139,7 +170,7 @@ private: AvPlayerStateCallback& m_state; SceAvPlayerMemAllocator m_memory_replacement{}; - u64 m_num_output_video_framebuffers{}; + u32 m_num_output_video_framebuffers{}; std::atomic_bool m_is_looping = false; std::atomic_bool m_is_eof = false; @@ -161,7 +192,17 @@ private: std::optional m_video_stream_index{}; std::optional m_audio_stream_index{}; - std::mutex m_state_mutex; + EventCV m_audio_packets_cv{}; + EventCV m_audio_frames_cv{}; + EventCV m_audio_buffers_cv{}; + + EventCV m_video_packets_cv{}; + EventCV m_video_frames_cv{}; + EventCV m_video_buffers_cv{}; + + EventCV m_stop_cv{}; + + std::mutex m_state_mutex{}; std::jthread m_demuxer_thread{}; std::jthread m_video_decoder_thread{}; std::jthread m_audio_decoder_thread{}; diff --git a/src/core/libraries/avplayer/avplayer_state.cpp b/src/core/libraries/avplayer/avplayer_state.cpp index 061d1da6..884cd940 100644 --- a/src/core/libraries/avplayer/avplayer_state.cpp +++ b/src/core/libraries/avplayer/avplayer_state.cpp @@ -104,8 +104,9 @@ AvPlayerState::AvPlayerState(const SceAvPlayerInitData& init_data) } AvPlayerState::~AvPlayerState() { - if (m_up_source && m_current_state == AvState::Play) { - m_up_source->Stop(); + { + std::unique_lock lock(m_source_mutex); + m_up_source.reset(); } if (m_controller_thread.joinable()) { m_controller_thread.request_stop(); @@ -121,18 +122,22 @@ s32 AvPlayerState::AddSource(std::string_view path, SceAvPlayerSourceType source return -1; } - if (m_up_source != nullptr) { - LOG_ERROR(Lib_AvPlayer, "Only one source is supported."); - return -1; - } + { + std::unique_lock lock(m_source_mutex); + if (m_up_source != nullptr) { + LOG_ERROR(Lib_AvPlayer, "Only one source is supported."); + return -1; + } - m_up_source = std::make_unique(*this, path, m_init_data, source_type); + m_up_source = std::make_unique(*this, path, m_init_data, source_type); + } AddSourceEvent(); return 0; } // Called inside GAME thread s32 AvPlayerState::GetStreamCount() { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { LOG_ERROR(Lib_AvPlayer, "Could not get stream count. No source."); return -1; @@ -142,6 +147,7 @@ s32 AvPlayerState::GetStreamCount() { // Called inside GAME thread s32 AvPlayerState::GetStreamInfo(u32 stream_index, SceAvPlayerStreamInfo& info) { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { LOG_ERROR(Lib_AvPlayer, "Could not get stream {} info. No source.", stream_index); return -1; @@ -151,6 +157,7 @@ s32 AvPlayerState::GetStreamInfo(u32 stream_index, SceAvPlayerStreamInfo& info) // Called inside GAME thread s32 AvPlayerState::Start() { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr || m_up_source->Start() < 0) { LOG_ERROR(Lib_AvPlayer, "Could not start playback."); return -1; @@ -199,6 +206,7 @@ void AvPlayerState::StartControllerThread() { // Called inside GAME thread bool AvPlayerState::EnableStream(u32 stream_index) { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { return false; } @@ -207,6 +215,7 @@ bool AvPlayerState::EnableStream(u32 stream_index) { // Called inside GAME thread bool AvPlayerState::Stop() { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr || m_current_state == AvState::Stop) { return false; } @@ -218,6 +227,7 @@ bool AvPlayerState::Stop() { } bool AvPlayerState::GetVideoData(SceAvPlayerFrameInfo& video_info) { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { return false; } @@ -225,6 +235,7 @@ bool AvPlayerState::GetVideoData(SceAvPlayerFrameInfo& video_info) { } bool AvPlayerState::GetVideoData(SceAvPlayerFrameInfoEx& video_info) { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { return false; } @@ -232,6 +243,7 @@ bool AvPlayerState::GetVideoData(SceAvPlayerFrameInfoEx& video_info) { } bool AvPlayerState::GetAudioData(SceAvPlayerFrameInfo& audio_info) { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { return false; } @@ -239,6 +251,7 @@ bool AvPlayerState::GetAudioData(SceAvPlayerFrameInfo& audio_info) { } bool AvPlayerState::IsActive() { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { return false; } @@ -247,6 +260,7 @@ bool AvPlayerState::IsActive() { } u64 AvPlayerState::CurrentTime() { + std::shared_lock lock(m_source_mutex); if (m_up_source == nullptr) { LOG_ERROR(Lib_AvPlayer, "Could not get current time. No source."); return 0; @@ -254,6 +268,16 @@ u64 AvPlayerState::CurrentTime() { return m_up_source->CurrentTime(); } +bool AvPlayerState::SetLooping(bool is_looping) { + std::shared_lock lock(m_source_mutex); + if (m_up_source == nullptr) { + LOG_ERROR(Lib_AvPlayer, "Could not set loop flag. No source."); + return false; + } + m_up_source->SetLooping(is_looping); + return true; +} + // May be called from different threads void AvPlayerState::OnWarning(u32 id) { // Forward to CONTROLLER thread @@ -313,6 +337,7 @@ bool AvPlayerState::SetState(AvState state) { // Called inside CONTROLLER thread std::optional AvPlayerState::OnBufferingCheckEvent(u32 num_frames) { + std::shared_lock lock(m_source_mutex); if (!m_up_source) { return std::nullopt; } @@ -351,6 +376,7 @@ void AvPlayerState::ProcessEvent() { break; } case AvEventType::AddSource: { + std::shared_lock lock(m_source_mutex); if (m_up_source->FindStreamInfo()) { SetState(AvState::Ready); OnPlaybackStateChanged(AvState::Ready); diff --git a/src/core/libraries/avplayer/avplayer_state.h b/src/core/libraries/avplayer/avplayer_state.h index eed3265f..ff80b6ce 100644 --- a/src/core/libraries/avplayer/avplayer_state.h +++ b/src/core/libraries/avplayer/avplayer_state.h @@ -12,6 +12,7 @@ #include #include +#include namespace Libraries::AvPlayer { @@ -34,6 +35,7 @@ public: bool GetVideoData(SceAvPlayerFrameInfoEx& video_info); bool IsActive(); u64 CurrentTime(); + bool SetLooping(bool is_looping); private: using ScePthreadMutex = Kernel::ScePthreadMutex; @@ -76,6 +78,7 @@ private: u32 m_thread_affinity; std::atomic_uint32_t m_some_event_result{}; + std::shared_mutex m_source_mutex{}; std::mutex m_state_machine_mutex{}; std::mutex m_event_handler_mutex{}; std::jthread m_controller_thread{};