跳转到内容
View in the app

A better way to browse. Learn more.

彼岸论坛

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.
欢迎抵达彼岸 彼岸花开 此处谁在 -彼岸论坛

[Go 编程语言] 各位大佬,请教一个 golang 多线程的阻塞问题

发表于

编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

taskChan 的缓冲区为 1000 时,阻塞的日志如下:

[DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
[DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
[DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
[DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]

如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
[INFO]:目录扫描完毕
[DEBUG]:func GetAllFilePath end
[DEBUG]:func StartScan end
[DEBUG]:func btnStartScanOnclick end

代码如下:

package core

import (
	"DopliGo/logs"
	"github.com/panjf2000/ants/v2"
	"os"
	"path/filepath"
	"sync"
	"sync/atomic"
)

func GetAllFilePath(rootPath string) {
	//logs.IsLogDebug = false
	logs.Debug("func GetAllFilePath start")
	// 创建任务通道和结果通道
	var taskChan = make(chan string, 1000000)
	var resultChan = make(chan string, 1000000)
	var wg sync.WaitGroup
	var counter int64 = 0

	// 创建生产者 goroutine 池
	producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
		produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
	})

	logs.Debug("cap:%d", producerPool.Cap())
	defer producerPool.Release()

	taskChan <- rootPath
	wg.Add(1) // 这里增加计数器
	atomic.AddInt64(&counter, 1)
	logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))

	// 启动生产者
	go func() {
		//defer logs.Debug("生产者退出")
		for task := range taskChan {
			err := producerPool.Invoke(task)
			if err != nil {
				logs.Error("failed to producerPool Invoke, err: %s", err)
				return
			}
		}
	}()

	// 启动结果处理 goroutine
	go func() {
		//defer logs.Debug("消费者退出")
		for result := range resultChan {
			_ = result
		}
	}()

	// 等待所有任务完成
	wg.Wait()
	close(resultChan)
	close(taskChan)
	logs.Info("目录扫描完毕")
	logs.Debug("func GetAllFilePath end")
}

func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) {
	defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done
	//	logs.Debug("func produceTasks start")

	entries, err := os.ReadDir(rootPath)
	if err != nil {
		logs.Error("failed to read dir: %s , err: %s", rootPath, err)
		return
	}

	for _, entry := range entries {
		path := filepath.Join(rootPath, entry.Name())
		if entry.IsDir() {
			wg.Add(1)
			atomic.AddInt64(counter, 1)
			select {
			case taskChan <- path:
				// 发送成功
			default:
				// 发送失败,通道已满
				logs.Error("写入通道失败...")
			}
			logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
		} else {
			resultChan <- path
		}
	}

	atomic.AddInt64(counter, -1)
	logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
	//logs.Debug("func produceTasks end")
}

Featured Replies

No posts to show

创建帐户或登录来提出意见

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.