Object Reuse Technology Deep Dive: PublishWriter, AVFrame, and ReuseArray in Reducing GC Pressure
Introduction
In high-performance streaming media processing systems, frequent creation and destruction of small objects can lead to significant garbage collection (GC) pressure, severely impacting system performance. This article provides an in-depth analysis of the object reuse mechanisms in three core components of the Monibuca v5 streaming framework: PublishWriter, AVFrame, and ReuseArray, demonstrating how carefully designed memory management strategies can significantly reduce GC overhead.
1. Problem Background: GC Pressure and Performance Bottlenecks
1.1 GC Pressure Issues in Legacy WriteAudio/WriteVideo
Let's examine the specific implementation of the WriteAudio
method in the legacy version of Monibuca to understand the GC pressure it generates:
// Key problematic code in legacy WriteAudio method
func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
// 1. Each call may create a new AVTrack
if t == nil {
t = NewAVTrack(data, ...) // New object creation
}
// 2. Create new wrapper objects for each sub-track - main source of GC pressure
for i, track := range p.AudioTrack.Items[1:] {
toType := track.FrameType.Elem()
// Use reflect.New() to create new objects every time
toFrame := reflect.New(toType).Interface().(IAVFrame)
t.Value.Wraps = append(t.Value.Wraps, toFrame) // Memory allocation
}
}
GC Pressure Analysis in Legacy Version:
Frequent Object Creation:
- Each call to
WriteAudio
may create a newAVTrack
- Create new wrapper objects for each sub-track using
reflect.New()
- Create new
IAVFrame
instances every time
- Each call to
Memory Allocation Overhead:
- Reflection overhead from
reflect.New(toType)
- Dynamic type conversion:
Interface().(IAVFrame)
- Frequent slice expansion:
append(t.Value.Wraps, toFrame)
- Reflection overhead from
GC Pressure Scenarios:
// 30fps video stream, 30 calls per second
for i := 0; i < 30; i++ {
audioFrame := &AudioFrame{Data: audioData}
publisher.WriteAudio(audioFrame) // Each call creates multiple objects
}
1.2 Object Reuse Solution in New Version
The new version implements object reuse through the PublishWriter pattern:
// New version - Object reuse approach
func publishWithReuse(publisher *Publisher) {
// 1. Create memory allocator with pre-allocated memory
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
// 2. Create writer with object reuse
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
// 3. Reuse writer.AudioFrame to avoid creating new objects
for i := 0; i < 30; i++ {
copy(writer.AudioFrame.NextN(len(audioData)), audioData)
writer.NextAudio() // Reuse object, no new object creation
}
}
Advantages of New Version:
- Zero Object Creation: Reuse
writer.AudioFrame
, avoiding new object creation each time - Pre-allocated Memory: Pre-allocated memory pool through
ScalableMemoryAllocator
- Eliminate Reflection Overhead: Use generics to avoid
reflect.New()
- Reduce GC Pressure: Object reuse significantly reduces GC frequency
2. Version Comparison: From WriteAudio/WriteVideo to PublishWriter
2.1 Legacy Version (v5.0.5 and earlier) Usage
In Monibuca v5.0.5 and earlier versions, publishing audio/video data used direct WriteAudio and WriteVideo methods:
// Legacy version usage
func publishWithOldAPI(publisher *Publisher) {
audioFrame := &AudioFrame{Data: audioData}
publisher.WriteAudio(audioFrame) // Create new object each time
videoFrame := &VideoFrame{Data: videoData}
publisher.WriteVideo(videoFrame) // Create new object each time
}
Core Issues with Legacy WriteAudio/WriteVideo:
From the actual code, we can see that the legacy version creates objects on every call:
- Create New AVTrack (if it doesn't exist):
if t == nil {
t = NewAVTrack(data, ...) // New object creation
}
- Create Multiple Wrapper Objects:
// Create new wrapper objects for each sub-track
for i, track := range p.AudioTrack.Items[1:] {
toFrame := reflect.New(toType).Interface().(IAVFrame) // Create new object every time
t.Value.Wraps = append(t.Value.Wraps, toFrame)
}
Problems with Legacy Version:
- Create new Frame objects and wrapper objects on every call
- Use
reflect.New()
for dynamic object creation with high performance overhead - Cannot control memory allocation strategy
- Lack object reuse mechanism
- High GC pressure
2.2 New Version (v5.1.0+) PublishWriter Pattern
The new version introduces a generic-based PublishWriter pattern that implements object reuse:
// New version usage
func publishWithNewAPI(publisher *Publisher) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
// Reuse objects to avoid creating new objects
copy(writer.AudioFrame.NextN(len(audioData)), audioData)
writer.NextAudio()
copy(writer.VideoFrame.NextN(len(videoData)), videoData)
writer.NextVideo()
}
2.3 Migration Guide
2.3.1 Basic Migration Steps
- Replace Object Creation Method
// Legacy version - Create new object each time
audioFrame := &AudioFrame{Data: data}
publisher.WriteAudio(audioFrame) // Internally creates multiple wrapper objects
// New version - Reuse objects
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
copy(writer.AudioFrame.NextN(len(data)), data)
writer.NextAudio() // Reuse object, no new object creation
- Add Memory Management
// New version must add memory allocator
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle() // Ensure resource release
- Use Generic Types
// Explicitly specify audio/video frame types
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
2.3.2 Common Migration Scenarios
Scenario 1: Simple Audio/Video Publishing
// Legacy version
func simplePublish(publisher *Publisher, audioData, videoData []byte) {
publisher.WriteAudio(&AudioFrame{Data: audioData})
publisher.WriteVideo(&VideoFrame{Data: videoData})
}
// New version
func simplePublish(publisher *Publisher, audioData, videoData []byte) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
copy(writer.AudioFrame.NextN(len(audioData)), audioData)
writer.NextAudio()
copy(writer.VideoFrame.NextN(len(videoData)), videoData)
writer.NextVideo()
}
Scenario 2: Stream Transformation Processing
// Legacy version - Create new objects for each transformation
func transformStream(subscriber *Subscriber, publisher *Publisher) {
m7s.PlayBlock(subscriber,
func(audio *AudioFrame) error {
return publisher.WriteAudio(audio) // Create new object each time
},
func(video *VideoFrame) error {
return publisher.WriteVideo(video) // Create new object each time
})
}
// New version - Reuse objects to avoid repeated creation
func transformStream(subscriber *Subscriber, publisher *Publisher) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
m7s.PlayBlock(subscriber,
func(audio *AudioFrame) error {
audio.CopyTo(writer.AudioFrame.NextN(audio.Size))
return writer.NextAudio() // Reuse object
},
func(video *VideoFrame) error {
video.CopyTo(writer.VideoFrame.NextN(video.Size))
return writer.NextVideo() // Reuse object
})
}
Scenario 3: Multi-format Conversion Processing
// Legacy version - Create new objects for each sub-track
func handleMultiFormatOld(publisher *Publisher, data IAVFrame) {
publisher.WriteAudio(data) // Internally creates new objects for each sub-track
}
// New version - Pre-allocate and reuse
func handleMultiFormatNew(publisher *Publisher, data IAVFrame) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
// Reuse writer object to avoid creating new objects for each sub-track
data.CopyTo(writer.AudioFrame.NextN(data.GetSize()))
writer.NextAudio()
}
3. Core Components Deep Dive
3.1 ReuseArray: The Core of Generic Object Pool
ReuseArray
is the foundation of the entire object reuse system. It's a generic-based object reuse array that implements "expand on demand, smart reset":
type ReuseArray[T any] []T
func (s *ReuseArray[T]) GetNextPointer() (r *T) {
ss := *s
l := len(ss)
if cap(ss) > l {
// Sufficient capacity, directly extend length - zero allocation
ss = ss[:l+1]
} else {
// Insufficient capacity, create new element - only this one allocation
var new T
ss = append(ss, new)
}
*s = ss
r = &((ss)[l])
// If object implements Resetter interface, auto-reset
if resetter, ok := any(r).(Resetter); ok {
resetter.Reset()
}
return r
}
3.1.1 Core Design Philosophy
1. Smart Capacity Management
// First call: Create new object
nalu1 := nalus.GetNextPointer() // Allocate new Memory object
// Subsequent calls: Reuse allocated objects
nalu2 := nalus.GetNextPointer() // Reuse nalu1's memory space
nalu3 := nalus.GetNextPointer() // Reuse nalu1's memory space
2. Automatic Reset Mechanism
type Resetter interface {
Reset()
}
// Memory type implements Resetter interface
func (m *Memory) Reset() {
m.Buffers = m.Buffers[:0] // Reset slice length, preserve capacity
m.Size = 0
}
3.1.2 Real Application Scenarios
Scenario 1: Object Reuse in NALU Processing
// In video frame processing, NALU array uses ReuseArray
type Nalus = util.ReuseArray[util.Memory]
func (r *VideoFrame) Demux() error {
nalus := r.GetNalus() // Get NALU reuse array
for packet := range r.Packets.RangePoint {
// Get reused NALU object each time, avoid creating new objects
nalu := nalus.GetNextPointer() // Reuse object
nalu.PushOne(packet.Payload) // Fill data
}
}
Scenario 2: SEI Insertion Processing
SEI insertion achieves efficient processing through object reuse:
func (t *Transformer) Run() (err error) {
allocator := util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](pub, allocator)
return m7s.PlayBlock(t.TransformJob.Subscriber,
func(video *format.H26xFrame) (err error) {
nalus := writer.VideoFrame.GetNalus() // Reuse NALU array
// Process each NALU, reuse NALU objects
for nalu := range video.Raw.(*pkg.Nalus).RangePoint {
p := nalus.GetNextPointer() // Reuse object, auto Reset()
mem := writer.VideoFrame.NextN(nalu.Size)
nalu.CopyTo(mem)
// Insert SEI data
if len(seis) > 0 {
for _, sei := range seis {
p.Push(append([]byte{byte(codec.NALU_SEI)}, sei...))
}
}
p.PushOne(mem)
}
return writer.NextVideo() // Reuse VideoFrame object
})
}
Key Advantage: Through nalus.GetNextPointer()
reusing NALU objects, avoiding creating new objects for each NALU, significantly reducing GC pressure.
Scenario 3: RTP Packet Processing
func (r *VideoFrame) Demux() error {
nalus := r.GetNalus()
var nalu *util.Memory
for packet := range r.Packets.RangePoint {
switch t := codec.ParseH264NALUType(b0); t {
case codec.NALU_STAPA, codec.NALU_STAPB:
// Process aggregation packets, each NALU reuses objects
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalus.GetNextPointer().PushOne(buffer.ReadN(nextSize))
}
}
case codec.NALU_FUA, codec.NALU_FUB:
// Process fragmented packets, reuse same NALU object
if util.Bit1(b1, 0) {
nalu = nalus.GetNextPointer() // Reuse object
nalu.PushOne([]byte{naluType.Or(b0 & 0x60)})
}
if nalu != nil && nalu.Size > 0 {
nalu.PushOne(packet.Payload[offset:])
}
}
}
}
3.1.3 Performance Advantage Analysis
Problems with Traditional Approach:
// Legacy version - Create new object each time
func processNalusOld(packets []RTPPacket) {
var nalus []util.Memory
for _, packet := range packets {
nalu := util.Memory{} // Create new object each time
nalu.PushOne(packet.Payload)
nalus = append(nalus, nalu) // Memory allocation
}
}
Advantages of ReuseArray:
// New version - Reuse objects
func processNalusNew(packets []RTPPacket) {
var nalus util.ReuseArray[util.Memory]
for _, packet := range packets {
nalu := nalus.GetNextPointer() // Reuse object, zero allocation
nalu.PushOne(packet.Payload)
}
}
Performance Comparison:
- Memory Allocation Count: Reduced from 1 per packet to 1 for first time only
- GC Pressure: Reduced by 90%+
- Processing Latency: Reduced by 50%+
- Memory Usage: Reduced memory fragmentation
3.1.4 Key Methods Deep Dive
GetNextPointer() - Core Reuse Method
func (s *ReuseArray[T]) GetNextPointer() (r *T) {
ss := *s
l := len(ss)
if cap(ss) > l {
// Key optimization: prioritize using allocated memory
ss = ss[:l+1] // Only extend length, don't allocate new memory
} else {
// Only allocate new memory when necessary
var new T
ss = append(ss, new)
}
*s = ss
r = &((ss)[l])
// Auto-reset to ensure consistent object state
if resetter, ok := any(r).(Resetter); ok {
resetter.Reset()
}
return r
}
Reset() - Batch Reset
func (s *ReuseArray[T]) Reset() {
*s = (*s)[:0] // Reset length, preserve capacity
}
Reduce() - Reduce Elements
func (s *ReuseArray[T]) Reduce() {
ss := *s
*s = ss[:len(ss)-1] // Reduce last element
}
RangePoint() - Efficient Iteration
func (s ReuseArray[T]) RangePoint(f func(yield *T) bool) {
for i := range len(s) {
if !f(&s[i]) { // Pass pointer, avoid copy
return
}
}
}
3.2 AVFrame: Audio/Video Frame Object Reuse
AVFrame
uses a layered design, integrating RecyclableMemory
for fine-grained memory management:
type AVFrame struct {
DataFrame
*Sample
Wraps []IAVFrame // Encapsulation format array
}
type Sample struct {
codec.ICodecCtx
util.RecyclableMemory // Recyclable memory
*BaseSample
}
Memory Management Mechanism:
func (r *RecyclableMemory) Recycle() {
if r.recycleIndexes != nil {
for _, index := range r.recycleIndexes {
r.allocator.Free(r.Buffers[index]) // Precise recycling
}
r.recycleIndexes = r.recycleIndexes[:0]
}
r.Reset()
}
3.3 PublishWriter: Object Reuse for Streaming Writes
PublishWriter
uses generic design, supporting separate audio/video write modes:
type PublishWriter[A IAVFrame, V IAVFrame] struct {
*PublishAudioWriter[A]
*PublishVideoWriter[V]
}
Usage Flow:
// 1. Create allocator
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
// 2. Create writer
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
// 3. Reuse objects to write data
writer.AudioFrame.SetTS32(timestamp)
copy(writer.AudioFrame.NextN(len(data)), data)
writer.NextAudio()
4. Performance Optimization Results
4.1 Memory Allocation Comparison
Scenario | Legacy WriteAudio/WriteVideo | New PublishWriter | Performance Improvement |
---|---|---|---|
30fps video stream | 30 objects/sec + multiple wrapper objects | 0 new object creation | 100% |
Memory allocation count | High frequency allocation + reflect.New() overhead | Pre-allocate + reuse | 90%+ |
GC pause time | Frequent pauses | Significantly reduced | 80%+ |
Multi-format conversion | Create new objects for each sub-track | Reuse same object | 95%+ |
4.2 Actual Test Data
// Performance test comparison
func BenchmarkOldVsNew(b *testing.B) {
// Legacy version test
b.Run("OldWriteAudio", func(b *testing.B) {
for i := 0; i < b.N; i++ {
frame := &AudioFrame{Data: make([]byte, 1024)}
publisher.WriteAudio(frame) // Create multiple objects each time
}
})
// New version test
b.Run("NewPublishWriter", func(b *testing.B) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
b.ResetTimer()
for i := 0; i < b.N; i++ {
copy(writer.AudioFrame.NextN(1024), make([]byte, 1024))
writer.NextAudio() // Reuse object, no new object creation
}
})
}
Test Results:
- Memory Allocation Count: Reduced from 10+ per frame (including wrapper objects) to 0
- reflect.New() Overhead: Reduced from overhead on every call to 0
- GC Pressure: Reduced by 90%+
- Processing Latency: Reduced by 60%+
- Throughput: Improved by 3-5x
- Multi-format Conversion Performance: Improved by 5-10x (avoid creating objects for each sub-track)
5. Best Practices and Considerations
5.1 Migration Best Practices
5.1.1 Gradual Migration
// Step 1: Keep original logic, add allocator
func migrateStep1(publisher *Publisher) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
// Temporarily keep old way, but added memory management
frame := &AudioFrame{Data: data}
publisher.WriteAudio(frame)
}
// Step 2: Gradually replace with PublishWriter
func migrateStep2(publisher *Publisher) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
copy(writer.AudioFrame.NextN(len(data)), data)
writer.NextAudio()
}
5.1.2 Memory Allocator Selection
// Choose appropriate allocator size based on scenario
var allocator *util.ScalableMemoryAllocator
switch scenario {
case "high_fps":
allocator = util.NewScalableMemoryAllocator(1 << 14) // 16KB
case "low_latency":
allocator = util.NewScalableMemoryAllocator(1 << 10) // 1KB
case "high_throughput":
allocator = util.NewScalableMemoryAllocator(1 << 16) // 64KB
}
5.2 Common Pitfalls and Solutions
5.2.1 Forgetting Resource Release
// Wrong: Forget to recycle memory
func badExample() {
allocator := util.NewScalableMemoryAllocator(1 << 12)
// Forget defer allocator.Recycle()
}
// Correct: Ensure resource release
func goodExample() {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle() // Ensure release
}
5.2.2 Type Mismatch
// Wrong: Type mismatch
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
writer.AudioFrame = &SomeOtherFrame{} // Type error
// Correct: Use matching types
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
6. Real Application Cases
6.1 WebRTC Stream Processing Migration
// Legacy WebRTC processing
func handleWebRTCOld(track *webrtc.TrackRemote, publisher *Publisher) {
for {
buf := make([]byte, 1500)
n, _, err := track.Read(buf)
if err != nil {
return
}
frame := &VideoFrame{Data: buf[:n]}
publisher.WriteVideo(frame) // Create new object each time
}
}
// New WebRTC processing
func handleWebRTCNew(track *webrtc.TrackRemote, publisher *Publisher) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublishVideoWriter[*VideoFrame](publisher, allocator)
for {
buf := allocator.Malloc(1500)
n, _, err := track.Read(buf)
if err != nil {
return
}
writer.VideoFrame.AddRecycleBytes(buf[:n])
writer.NextVideo() // Reuse object
}
}
6.2 FLV File Stream Pulling Migration
// Legacy FLV stream pulling
func pullFLVOld(publisher *Publisher, file *os.File) {
for {
tagType, data, timestamp := readFLVTag(file)
switch tagType {
case FLV_TAG_TYPE_VIDEO:
frame := &VideoFrame{Data: data, Timestamp: timestamp}
publisher.WriteVideo(frame) // Create new object each time
}
}
}
// New FLV stream pulling
func pullFLVNew(publisher *Publisher, file *os.File) {
allocator := util.NewScalableMemoryAllocator(1 << 12)
defer allocator.Recycle()
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
for {
tagType, data, timestamp := readFLVTag(file)
switch tagType {
case FLV_TAG_TYPE_VIDEO:
writer.VideoFrame.SetTS32(timestamp)
copy(writer.VideoFrame.NextN(len(data)), data)
writer.NextVideo() // Reuse object
}
}
}
7. Summary
7.1 Core Advantages
By migrating from the legacy WriteAudio/WriteVideo to the new PublishWriter pattern, you can achieve:
- Significantly Reduce GC Pressure: Convert frequent small object creation to object state reset through object reuse
- Improve Memory Utilization: Reduce memory fragmentation through pre-allocation and smart expansion
- Reduce Processing Latency: Reduce GC pause time, improve real-time performance
- Increase System Throughput: Reduce memory allocation overhead, improve processing efficiency
7.2 Migration Recommendations
- Gradual Migration: First add memory allocator, then gradually replace with PublishWriter
- Type Safety: Use generics to ensure type matching
- Resource Management: Always use defer to ensure resource release
- Performance Monitoring: Add memory usage monitoring for performance tuning
7.3 Applicable Scenarios
This object reuse mechanism is particularly suitable for:
- High frame rate audio/video processing
- Real-time streaming media systems
- High-frequency data processing
- Latency-sensitive applications
By properly applying these technologies, you can significantly improve system performance and stability, providing a solid technical foundation for high-concurrency, low-latency streaming media applications.