// =====================================================================
// Copyright 2013-2017 Fluffy Underware
// All rights reserved
//
// http://www.fluffyunderware.com
// =====================================================================
using UnityEngine;
using System.Collections;
using System;
using System.Collections.Generic;
using System.Linq;
#if ENABLE_IL2CPP == false
using System.Linq.Expressions;
#endif
using System.Runtime.CompilerServices;
using UnityEngine.Assertions;
#if !UNITY_WSA && !UNITY_WEBGL
using System.Threading;
#endif
namespace FluffyUnderware.DevTools
{
///
/// A class to execute actions in a multi-threaded way
///
/// The type of the action input
public class ThreadPoolWorker : IDisposable
{
//TODO OPTIM Is ThreadPoolWorker still needed. Aren't all unity version handling .Net's parallel fors now?
//TODO use Windows.System.Threading.ThreadPool.RunAsync(workItem => SomeMethod()); #if NETFX_CORE
#if !UNITY_WSA && !UNITY_WEBGL
private readonly SimplePool queuedCallbackPool = new SimplePool(4);
private readonly SimplePool> loopStatePool = new SimplePool>(4);
private int _remainingWorkItems = 1;
private ManualResetEvent _done = new ManualResetEvent(false);
private WaitCallback handleWorkItemCallBack;
private WaitCallback handleLoopCallBack;
public ThreadPoolWorker()
{
handleWorkItemCallBack = o =>
{
QueuedCallback queuedCallback = (QueuedCallback)o;
try
{
queuedCallback.Callback(queuedCallback.State);
}
finally
{
lock (queuedCallbackPool)
queuedCallbackPool.ReleaseItem(queuedCallback);
DoneWorkItem();
}
};
handleLoopCallBack = state =>
{
LoopState loopS = (LoopState)state;
for (int i = loopS.StartIndex; i <= loopS.EndIndex; i++)
{
loopS.Action(loopS.Items.ElementAt(i), i, loopS.ItemsCount);
}
lock (loopStatePool)
loopStatePool.ReleaseItem(loopS);
};
}
[Obsolete("Use ParallelFor(Action action, IEnumerable list) instead")]
public void ParralelFor(Action action, List list)
{
ParallelFor((item, itemIndex, itemsCount) => action(item), list, list.Count());
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ParallelFor(Action action, IEnumerable list)
{
ParallelFor(action, list, list.Count());
}
public void ParallelFor(Action action, IEnumerable list, int elementsCount)
{
int threadsToUseCount;
{
int availableThreads;
#if NET_4_6
int temp;
ThreadPool.GetAvailableThreads(out availableThreads, out temp);
#else
availableThreads = Environment.ProcessorCount - 1;
#endif
threadsToUseCount = 1 /*main thread*/ + Math.Min(availableThreads, Environment.ProcessorCount - 1 /*keep one processor for the main thead*/);
}
//BUG a bug in iterationPerThread leads to sometimes not using all available threads. For example, if you have 5 items and 4 cores, only 3 cores will be used
int iterationPerThread = threadsToUseCount == 1
? elementsCount
: (int)Math.Ceiling((float)elementsCount / threadsToUseCount);
#if CURVY_SANITY_CHECKS
Assert.IsTrue(iterationPerThread * threadsToUseCount >= elementsCount);
#endif
int currentIndex = 0;
while (currentIndex < elementsCount)
{
int endEndex = Math.Min(currentIndex + iterationPerThread - 1, elementsCount - 1);
if (endEndex == elementsCount - 1)
for (int i = currentIndex; i <= endEndex; i++)
action(list.ElementAt(i), i, elementsCount);
else
{
QueuedCallback queuedCallback;
{
lock (queuedCallbackPool)
queuedCallback = queuedCallbackPool.GetItem();
}
LoopState loopState;
{
lock (loopStatePool)
loopState = loopStatePool.GetItem();
}
loopState.Set((short)currentIndex,
(short)endEndex,
list,
elementsCount,
action);
queuedCallback.State = loopState;
queuedCallback.Callback = handleLoopCallBack;
ThrowIfDisposed();
//Debug.LogWarning("New thread " + " from "+ loopState.StartIndex + " to " + loopState.EndIndex);
lock (_done)
_remainingWorkItems++;
ThreadPool.QueueUserWorkItem(handleWorkItemCallBack, queuedCallback);
}
currentIndex = endEndex + 1;
}
WaitAll(-1, false);
}
bool WaitAll(int millisecondsTimeout, bool exitContext)
{
ThrowIfDisposed();
DoneWorkItem();
bool rv = _done.WaitOne(millisecondsTimeout, exitContext);
lock (_done)
{
if (rv)
{
_remainingWorkItems = 1;
_done.Reset();
}
else
_remainingWorkItems++;
}
return rv;
}
private void ThrowIfDisposed()
{
if (_done == null)
throw new ObjectDisposedException(GetType().Name);
}
private void DoneWorkItem()
{
lock (_done)
{
--_remainingWorkItems;
if (_remainingWorkItems == 0)
_done.Set();
}
}
public void Dispose()
{
if (_done != null)
{
((IDisposable)_done).Dispose();
_done = null;
}
}
#else
public ThreadPoolWorker()
{
}
[Obsolete("Use ParallelFor(Action action, IEnumerable list) instead")]
public void ParralelFor(Action action, List list)
{
for (var i = 0; i < list.Count; i++)
action(list[i]);
}
public void ParallelFor(Action action, IEnumerable list)
{
ParallelFor(action, list, list.Count());
}
public void ParallelFor(Action action, IEnumerable list, int elementsCount)
{
for (int i = 0; i < elementsCount; i++)
action(list.ElementAt(i), i, elementsCount);
}
public void Dispose()
{
}
#endif
}
internal class SimplePool where T : new()
{
private readonly List freeItemsBackfield;
#if ENABLE_IL2CPP == false
private static readonly Func OptimizedInstantiator = Expression.Lambda>(
Expression.New(typeof(T))
).Compile();
#endif
public SimplePool(int preCreatedElementsCount)
{
freeItemsBackfield = new List();
for (int i = 0; i < preCreatedElementsCount; i++)
freeItemsBackfield.Add(
#if ENABLE_IL2CPP == false
OptimizedInstantiator.Invoke()
#else
new T()
#endif
);
}
public T GetItem()
{
T item;
if (freeItemsBackfield.Count == 0)
item =
#if ENABLE_IL2CPP == false
OptimizedInstantiator.Invoke()
#else
new T()
#endif
;
else
{
int lastIndex = freeItemsBackfield.Count - 1;
item = freeItemsBackfield[lastIndex];
freeItemsBackfield.RemoveAt(lastIndex);
}
return item;
}
public void ReleaseItem(T item)
{
freeItemsBackfield.Add(item);
}
}
#if !UNITY_WSA && !UNITY_WEBGL
class QueuedCallback
{
public WaitCallback Callback;
public object State;
}
class LoopState
{
public short StartIndex { get; private set; }
public short EndIndex { get; private set; }
public IEnumerable Items { get; private set; }
public int ItemsCount { get; private set; }
public Action Action { get; private set; }
public LoopState()
{
}
public LoopState(short startIndex, short endIndex, IEnumerable items, int itemsCount, Action action)
{
Set(startIndex, endIndex, items, itemsCount, action);
}
public void Set(short startIndex, short endIndex, IEnumerable items, int itemsCount, Action action)
{
StartIndex = startIndex;
EndIndex = endIndex;
Items = items;
ItemsCount = itemsCount;
Action = action;
}
}
#endif
///
/// This class is not very optimized. For better performance, use the generic version of ThreadPoolWorker instead
///
#if !UNITY_WSA && !UNITY_WEBGL
[Obsolete("Use ThreadPoolWorker instead")]
public class ThreadPoolWorker : IDisposable
{
private int _remainingWorkItems = 1;
private ManualResetEvent _done = new ManualResetEvent(false);
public void QueueWorkItem(WaitCallback callback)
{
QueueWorkItem(callback, null);
}
public void QueueWorkItem(Action act)
{
QueueWorkItem(act, null);
}
public void ParralelFor(Action action, List list)
{
int threadsToUseCount;
{
int availableThreads;
#if NET_4_6
int temp;
ThreadPool.GetAvailableThreads(out availableThreads, out temp);
#else
availableThreads = Environment.ProcessorCount - 1;
#endif
threadsToUseCount = 1 /*main thread*/ + Math.Min(availableThreads, Environment.ProcessorCount - 1 /*keep one processor for the main thead*/);
}
int iterationsCount = list.Count;
if (threadsToUseCount == 1 || iterationsCount == 1)
{
for (int i = 0; i < iterationsCount; i++)
{
action(list[i]);
}
}
else
{
int iterationPerThread = (int)Math.Ceiling((float)iterationsCount / threadsToUseCount);
int currentIndex = 0;
while (currentIndex < iterationsCount)
{
QueuedCallback queuedCallback = new QueuedCallback();
int endEndex = Math.Min(currentIndex + iterationPerThread, iterationsCount - 1);
LoopState loopState = new LoopState((short)currentIndex,
(short)endEndex,
list,
iterationsCount,
(item,
itemIndex,
itemsCount) => action(item));
queuedCallback.State = loopState;
queuedCallback.Callback = state =>
{
LoopState loopS = (LoopState)state;
for (int i = loopS.StartIndex; i <= loopS.EndIndex; i++)
{
loopS.Action(loopS.Items.ElementAt(i), i, iterationsCount);
}
};
QueueWorkItem(queuedCallback);
currentIndex = endEndex + 1;
}
}
}
private void QueueWorkItem(QueuedCallback callback)
{
ThrowIfDisposed();
lock (_done)
_remainingWorkItems++;
ThreadPool.QueueUserWorkItem(new WaitCallback(HandleWorkItem), callback);
}
public void QueueWorkItem(WaitCallback callback, object state)
{
QueuedCallback qc = new QueuedCallback();
qc.Callback = callback;
qc.State = state;
QueueWorkItem(qc);
}
public void QueueWorkItem(Action act, object state)
{
QueuedCallback qc = new QueuedCallback();
qc.Callback = (x => act());
qc.State = state;
QueueWorkItem(qc);
}
public bool WaitAll()
{
return WaitAll(-1, false);
}
public bool WaitAll(TimeSpan timeout, bool exitContext)
{
return WaitAll((int)timeout.TotalMilliseconds, exitContext);
}
public bool WaitAll(int millisecondsTimeout, bool exitContext)
{
ThrowIfDisposed();
DoneWorkItem();
bool rv = _done.WaitOne(millisecondsTimeout, exitContext);
lock (_done)
{
if (rv)
{
_remainingWorkItems = 1;
_done.Reset();
}
else
_remainingWorkItems++;
}
return rv;
}
private void HandleWorkItem(object state)
{
QueuedCallback qc = (QueuedCallback)state;
try
{
qc.Callback(qc.State);
}
finally
{
DoneWorkItem();
}
}
private void DoneWorkItem()
{
lock (_done)
{
--_remainingWorkItems;
if (_remainingWorkItems == 0)
_done.Set();
}
}
private void ThrowIfDisposed()
{
if (_done == null)
throw new ObjectDisposedException(GetType().Name);
}
public void Dispose()
{
if (_done != null)
{
((IDisposable)_done).Dispose();
_done = null;
}
}
}
#else
public class ThreadPoolWorker{}
#endif
}