source

다중 인수와 함께 multiprocessing pool.map을 사용하는 방법

lovecheck 2022. 12. 28. 21:48
반응형

다중 인수와 함께 multiprocessing pool.map을 사용하는 방법

Python 라이브러리에서, 의 변형은 있습니까?pool.map러러인 ?수 ?? ????

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

여러 인수를 지원하는 pool.map의 변종이 있습니까?

Python 3.3에는 다음과 같은 방법이 있습니다.

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

이전 버전의 경우:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

산출량

1 1
2 1
3 1

여기서 및 를 사용하는 방법에 주목해 주세요.

Python 2.6에서는 @unutbu가 언급한 버그로 인해 사용할 수 없거나 유사한 기능을 사용할 수 없으므로 간단한 래퍼 기능func_star() 를 명시적으로 정의해야 합니다.제안하는 회피책도 참조해 주세요.

이에 대한 답은 버전 및 상황에 따라 달라집니다.Python의 최신 버전(3.3 이후)에 대한 가장 일반적인 답변은 아래에 J.F.에 의해 처음 설명되었습니다. 세바스찬.1일련의 인수 튜플을 받아들이는 메서드를 사용합니다.그런 다음 각 태플에서 인수를 자동으로 풀고 지정된 함수에 전달합니다.

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

이전 버전의 Python에서는 명시적으로 인수를 언팩하는 도우미 함수를 작성해야 합니다.「 」를 사용하고 with 돌려야 Pool(muon이 이 점을 지적해 준 덕분에) 콘텍스트 매니저로 변환됩니다.

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

경우에는 두 인수가 고정되어 때 두 번째 인수로도 사용할 수 .partialPython 2.7+는 Python 2.7+는 Python 2.7+를 사용합니다.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. 이 답변의 대부분은 그의 답변에서 영감을 얻었고, 그 대신 받아들여졌어야 했다.하지만 이 책은 맨 위에 머물러 있기 때문에, 미래의 독자들을 위해 그것을 개선하는 것이 최선인 것 같다.

아래가 좋을 것 같습니다.

def multi_run_wrapper(args):
   return add(*args)

def add(x,y):
    return x+y

if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

산출량

[3, 5, 7]

Python 3.3+ 사용pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

결과:

1 --- 4
2 --- 5
3 --- 6

zip.zip(a,b,c,d,e)

상수 값을 인수로 전달하려는 경우:

import itertools

zip(itertools.repeat(constant), a)

함수가 무언가를 반환해야 하는 경우:

results = pool.starmap(write, zip(a,b))

그러면 반환된 값이 포함된 목록이 나타납니다.

여러 인수를 사용하는 방법:

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

J.F.에서 이터툴에 대해 배웠다. 세바스찬의 대답 나는 한 걸음 더 나아가서 글을 쓰기로 결심했다.parmap, "Parallelization", "Parallelization", "Parallelization"을 합니다.map ★★★★★★★★★★★★★★★★★」starmapPython 2.7 및 Python 3.2(및 그 이후)에서 임의의 수의 위치 인수를 사용할 수 있는 함수입니다.

인스톨

pip install parmap

병렬화 방법:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

PyPI와 GitHub 저장소에 parmap을 업로드했습니다.

예를 들어 다음과 같이 질문에 대답할 수 있습니다.

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

multiprocessingcalled pathos (주의: GitHub의 버전을 사용)가 필요 없습니다.starmap맵Python에 맵은 여러 할 수 있습니다.-- 맵 -- Python API --' -- -- -- 。

★★★★★★★★★★★★★★★★ pathos은, 「」에 을 실행할 .__main__block.는 약간의 입니다. 3.됩니다. Python 3.x로 하다

  Python 2.7.5 (default, Sep 30 2013, 20:15:49)
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathos 수 있는 이 몇 있습니다.starmap.

>>> def add(*x):
...   return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>

Python 2를 위한 더 나은 솔루션:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

산출량

2 3 4

1 2 3

0 1 2

out[]:

[3, 5, 7]

다른 방법은 목록 목록을 하나의 인수 루틴에 전달하는 것입니다.

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

그런 다음 원하는 방법으로 인수 목록을 작성할 수 있습니다.

좋은 방법은 손으로 포장 기능을 쓰는 대신 데코레이터를 사용하는 것입니다.특히 매핑해야 할 함수가 많을 때는 데코레이터가 모든 함수에 대해 래퍼를 쓰는 것을 피함으로써 시간을 절약할 수 있습니다.보통 장식된 함수는 고를 수 없지만functools피할 수 있을 것 같아요자세한 내용은 여기를 참조하십시오.

다음은 예를 제시하겠습니다.

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

그런 다음 zip 인수를 사용하여 매핑할 수 있습니다.

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

물론 Python 3 ( > = 3 . 3 )에서는, 다른 회답에 기재되어 있듯이, 항상 사용할 수 있습니다.

다음 두 가지 함수를 사용하여 각 새로운 함수에 대해 래퍼를 쓰지 않도록 할 수 있습니다.

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

기능을 사용합니다.function arg_0,arg_1 ★★★★★★★★★★★★★★★★★」arg_2음음음같 뭇매하다

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

또 다른 간단한 방법은 함수 매개변수를 태플로 묶은 다음 전달해야 하는 매개변수를 태플로도 묶는 것입니다.이것은 대량의 데이터를 취급하는 경우에는 이상적이지 않을 수 있습니다.각 태플마다 복사를 할 수 있을 것 같아요.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

출력을 임의의 순서로 제공합니다.

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

IMHO가 다른 어떤 답변보다 심플하고 우아하다는 것을 알 수 있는 또 다른 방법이 있습니다.

이 프로그램에는 다음 두 가지 파라미터를 사용하여 출력하고 합계를 출력하는 기능이 있습니다.

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

출력은 다음과 같습니다.

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

자세한 내용은 다음 python 문서를 참조하십시오.

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

특히 다음 웹 사이트를 꼭 확인하시기 바랍니다.starmap기능.

Python 3.6을 사용하고 있는데 이전 Python 버전에서도 동작할 수 있을지 모르겠습니다.

왜 이 문서에는 이런 간단한 예가 없는지는 잘 모르겠습니다.

Python 3.4.4에서 multiprocessing.get_context()를 사용하여 여러 시작 메서드를 사용하는 컨텍스트 개체를 얻을 수 있습니다.

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

아니면 그냥 대체해서

pool.map(harvester(text, case), case, 1)

포함:

pool.apply_async(harvester(text, case), case, 1)

공식 문서에는 하나의 반복 가능한 주장만 지원한다고 명시되어 있습니다.이러한 경우 apply_async를 사용합니다.당신의 경우, 저는 다음을 할 것입니다.

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

여기에는 많은 해답이 있지만, 어떤 버전에서도 동작하는 Python 2/3 호환 코드를 제공하는 것은 없는 것 같습니다.코드만 작동시키려면 Python 버전 중 하나에서 작동합니다.

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

그 후 일반 Python 3의 멀티프로세싱을 원하는 방식으로 사용할 수 있습니다.예를 들어 다음과 같습니다.

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

Python 2 또는 Python 3에서 동작합니다.

text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()

다음 예에서는 pool.imap fork에서 사용되는1개의 인수함수에 여러 인수를 전달하기 위해 사용하는 루틴을 보여 줍니다.

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()

이것도 다른 옵션일 수 있습니다.비결은 에 있습니다.wrapper전달된 다른 함수를 반환하는 함수pool.map다음 코드는 입력 배열을 읽고, 그 안에 있는 각 요소(고유한)에 대해 해당 요소가 배열에 표시되는 횟수(수)를 반환합니다. 예를 들어 입력이 다음과 같은 경우

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

0은 6회, 1은 3회 표시됩니다.

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count


def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out

def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results

def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun

def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)

if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
           
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

다음 정보를 얻을 수 있습니다.

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========
import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

모든 인수를 튜플 배열로 저장합니다.

이 예에서는 보통 함수를 다음과 같이 호출합니다.

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

대신 1개의 태플을 전달하고 인수를 언팩합니다.

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

사전에 루프를 사용하여 태플을 구축합니다.

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

그런 다음 tuples 배열을 전달하여 맵을 사용하여 모두 실행합니다.

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

이 Python을 가지고 있다는 것을 .* ★★★★★★★★★★★★★★★★★」**★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★

또한 낮은 수준의 다중 처리 라이브러리보다 높은 수준의 라이브러리 동시 미래를 사용하는 것이 좋습니다.

Python 2의 경우 이 트릭을 사용할 수 있습니다.

def fun(a, b):
    return a + b

pool = multiprocessing.Pool(processes=6)
b = 233
pool.map(lambda x:fun(x, b), range(1000))

언급URL : https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments

반응형