Await blocking other consumers in a for loop

timotech

Member
Joined
Aug 15, 2022
Messages
12
Programming Experience
10+
Hi guys,
Please I have a for loop containing calls to producers and consumers. Each producer in the loop performs audio samples collection in a blockingcollection<T>. I have 8 producers in total and apparently I will need 8 consumers. The challenge I am having is that in the for loop, I need to call await on the consumers, but immediately it hits the first await, it will neglect the rest. Only one producer will be working and only one consumer will be working. I know why, because definitely its supposed to await.
But if I remove the await, the program hangs. I know that it is hanging because of the consumer method, and its supposed to be called in 8 threads according to the loop. But instead the program hangs.
I think I have a design problem, but I don't know how to resolve it.
I hope you understand my problem.
Below is the code, thanks:

C#:
//Program call
private async void LoadThreads()
        {
            var tokenSource = new CancellationTokenSource();
            IModelService modelService = await GetModelService(); // initialize your data source

            TabControl.TabPageCollection pages = tabControl1.TabPages;

            for (int i = 0; i < pages.Count; i++) //pages.Count = 8
            {
                //producers
                Listener listener = new(cbBoxes[i].SelectedIndex, 5512);
                listeners.Add(listener);
                
                //consumers
                _ = await listeners[i].GetBestMatchForStream(modelService, tokenSource.Token, pages[i].Name);   
            }
        }

using SoundFingerprinting;
using SoundFingerprinting.Audio;
using SoundFingerprinting.Builder;
using SoundFingerprinting.Command;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
public class Listener
{
    public readonly int SampleRate;
    private readonly NAudio.Wave.WaveInEvent wvin;
    public string DetectedString { get; set; }
    public string LogString { get; set; }
    private readonly BlockingCollection<AudioSamples> realtimeSource;
    
    //producer
    public Listener(int deviceIndex, int sampleRate)
        {
            realtimeSource = new BlockingCollection<AudioSamples>();
            SampleRate = sampleRate;
            wvin = new NAudio.Wave.WaveInEvent
            {
                DeviceNumber = deviceIndex,
                WaveFormat = new NAudio.Wave.WaveFormat(sampleRate, bits: 16, channels: 1),
                BufferMilliseconds = 20
            };
            wvin.DataAvailable += OnNewAudioData;
            wvin.StartRecording();
        }
    
    private void OnNewAudioData(object sender, NAudio.Wave.WaveInEventArgs args)
    {
        short[] samples = new short[args.BytesRecorded / 2];
        Buffer.BlockCopy(args.Buffer, 0, samples, 0, args.BytesRecorded);
        // converting to [-1, +1] range
        float[] floats = Array.ConvertAll(samples, (sample => (float)sample / short.MaxValue));
        realtimeSource.Add(new AudioSamples(floats, string.Empty, SampleRate));
    }
    
//consumer code below
public async Task<double> GetBestMatchForStream(IModelService modelService, CancellationToken token, string tabName)
        {
            double seconds = await QueryCommandBuilder.Instance
                .BuildRealtimeQueryCommand()
                .From(new BlockingRealtimeCollection<AudioSamples>(realtimeSource))
                .WithRealtimeQueryConfig(config =>
                {
                    // match only those entries got at least 5 seconds of query match
                    config.ResultEntryFilter = new TrackMatchLengthEntryFilter(5d);
                    //config.QueryConfiguration.Audio.MaxTracksToReturn = 1; //May remove this after observation

                    // provide a success callback that will be invoked for matches that pass the result entry filter
                    config.SuccessCallback = result =>
                    {
                        foreach (var entry in result.ResultEntries)
                        {
                            //Debug.WriteLine($"Successfully matched {entry.TrackId}");
                            Debug.WriteLine($"Successfully matched {entry.Audio.Track.Title} at {DateTime.Now} on {tabName}");
                            DetectedString = $"Successfully matched {entry.Audio.Track.Title} at {DateTime.Now} on {tabName}";
                        }
                    };

                    config.DidNotPassFilterCallback = (queryResult) =>
                    {
                        foreach (var result in queryResult.ResultEntries)
                        {
                            //Debug.WriteLine($"Did not pass filter {result.TrackId}");
                            Debug.WriteLine($"Did not pass filter {result.Audio.Track.Title} at {DateTime.Now}");
                            LogString = $"Did not pass filter {result.Audio.Track.Title} at {DateTime.Now} on {tabName}";
                        }
                    };

                    return config;
                })
                .UsingServices(modelService)
                .Query(token);

            //Debug.WriteLine($"Realtime query stopped. Issued {seconds} seconds of query.");
            return seconds;
        }
}
 
Using await does not magically make your code multi threaded.
 
It's not clear why you need 8 consumers for 8 producers? In theory you could just above a single consumer and just make sure that it is reading from the appropriate collection as data becomes available in that collection.
 
It's not clear why you need 8 consumers for 8 producers? In theory you could just above a single consumer and just make sure that it is reading from the appropriate collection as data becomes available in that collection.
Hi @Skydiver, thanks for your comment. As a new person to multithreading, I knew their was a problem with my design. But didn't know how to resolve it.
I would so much prefer to use one consumer to read the producers.
I don't mind if you can point me to an article that addresses my concern or may be a code snippet.
Thanks a lot.
 
Grab a copy of "Concurrency in C# Cookbook" by Stephen Cleary Great explanations of various frameworks/technologies available, pros/cons, and best of all, how to use them effectively to solve particular problems.

Also worth reading are other articles by Stephen Cleary on using TPL.
 
Back
Top Bottom