source

파이썬에서 병렬 프로그래밍을 수행하는 방법은 무엇입니까?

lovecheck 2023. 6. 11. 10:57
반응형

파이썬에서 병렬 프로그래밍을 수행하는 방법은 무엇입니까?

C++의 경우 OpenMP를 사용하여 병렬 프로그래밍을 수행할 수 있지만 OpenMP는 Python에서 작동하지 않습니다.파이썬 프로그램의 일부를 병렬로 처리하려면 어떻게 해야 합니까?

코드의 구조는 다음과 같이 고려할 수 있습니다.

solve1(A)
solve2(B)

에▁where디solve1그리고.solve2두 개의 독립 함수입니다.실행 시간을 줄이기 위해 이런 종류의 코드를 순서가 아닌 병렬로 실행하는 방법은 무엇입니까?코드는 다음과 같습니다.

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

에▁where디setinner그리고.setouter두 개의 독립적인 함수입니다.거기가 제가 평행선을 달리고 싶은 곳입니다.

멀티프로세싱 모듈을 사용할 수 있습니다.이 경우 처리 풀을 사용할 수 있습니다.

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

이렇게 하면 일반적인 작업을 수행할 수 있는 프로세스가 생성됩니다.우리가 통과하지 못했기 때문에.processes시스템의 각 CPU 코어에 대해 하나의 프로세스를 생성합니다.각 CPU 코어는 하나의 프로세스를 동시에 실행할 수 있습니다.

목록을 단일 함수에 매핑하려면 다음을 수행합니다.

args = [A, B]
results = pool.map(solve1, args)

GIL은 python 개체에 대한 모든 작업을 잠그기 때문에 스레드를 사용하지 마십시오.

이것은 레이와 함께 매우 우아하게 이루어질 수 있습니다.

예를병하다면사음함정합여야니의다해수를용하을려렬제화▁with로 함수를 .@ray.remote 식가장나, 그고서그불들러다니입들을리▁them다▁decor▁with니,로 불러옵니다..remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

는 멀티프로세싱 모듈에 비해 여러 가지 장점이 있습니다.

  1. 시스템 클러스터뿐만 아니라 멀티코어 시스템에서도 동일한 코드가 실행됩니다.
  2. 프로세스는 공유 메모리제로 복사 직렬화를 통해 데이터를 효율적으로 공유합니다.
  3. 오류 메시지가 잘 전달됩니다.
  4. 이러한 함수 호출은 함께 구성될 수 있습니다.

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. 원격으로 함수를 호출하는 것 외에도 클래스를 원격으로 행위자로 인스턴스화할 수 있습니다.

Ray는 제가 개발을 도와온 프레임워크입니다.

다른 사람들이 말했듯이 해결책은 여러 프로세스를 사용하는 것입니다.그러나 어떤 프레임워크가 더 적합한지는 여러 요인에 따라 다릅니다.이미 언급한 것들 외에도 charm4pympi4py가 있습니다(나는 charm4py의 개발자입니다).

작업자 풀 추상화를 사용하는 것보다 위의 예제를 구현하는 더 효율적인 방법이 있습니다.주 루프가 동일한 매개 변수를 보냅니다(전체 그래프 포함).G각으로 전달합니다 각 1000번의 반복 작업자에게 반복적으로 전달합니다.적어도 한 명의 작업자가 다른 프로세스에 상주하므로, 여기에는 인수를 복사하여 다른 프로세스로 보내는 작업이 포함됩니다.이는 물체의 크기에 따라 매우 비용이 많이 들 수 있습니다.대신 작업자가 상태를 저장하고 업데이트된 정보를 전송하도록 하는 것이 좋습니다.

예를 들어 charm4py에서는 다음과 같이 수행할 수 있습니다.

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

이 예에서는 작업자가 한 명만 필요합니다.메인 루프는 기능 중 하나를 실행하고 작업자가 다른 기능을 실행하도록 할 수 있습니다.하지만 제 코드는 몇 가지를 설명하는 데 도움이 됩니다.

  1. 작업자 A는 프로세스 0(메인 루프와 동일)에서 실행됩니다.하는 동안에result_a.get()결과를 기다리는 동안 차단되며 작업자 A가 동일한 프로세스에서 계산을 수행합니다.
  2. 인수는 동일한 프로세스에 있으므로 작업자 A를 참조하여 자동으로 전달됩니다(복사가 수반되지 않음).

Cython은 병렬 프로그래밍을 C++보다 조금 더 흥미롭게 만드는 Global Interpreter Lock을 사용합니다.

이 항목에는 과제에 대한 몇 가지 유용한 예와 설명이 있습니다.

리눅스에서 작업 세트를 사용하여 다중 코어 시스템에서 Python GIL(Global Interpreter Lock) 해결 방법을 선택하시겠습니까?

경우에 따라 Numba를 사용하여 루프를 자동으로 병렬화할 수 있지만 Python의 작은 하위 집합에서만 작동합니다.

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

안타깝게도 Numba는 Numpy 배열에서만 작동하고 다른 Python 객체에서는 작동하지 않는 것 같습니다.이론적으로 Python을 C++로 컴파일한 다음 Intel C++ 컴파일러를 사용하여 자동으로 병렬화하는 도 가능할 수 있지만, 아직 시도하지는 않았습니다.

사용할 수 있습니다.joblib병렬 계산 및 다중 처리를 수행하는 라이브러리.

from joblib import Parallel, delayed

함수를 만들 수 있습니다.foo병렬로 실행하고 다음 코드를 기반으로 병렬 처리를 수행합니다.

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

에▁where디num_cores에서 얻을 수 있습니다.multiprocessing라이브러리는 다음과 같습니다.

import multiprocessing

num_cores = multiprocessing.cpu_count()

개인수가 중 가▁the▁the▁▁use▁▁can,수습▁function사다▁over를 사용할 수 있습니다.partial에서 합니다.functools라이브러리는 다음과 같습니다.

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

몇 가지 를 들어 파이썬과 R 다중 처리에 대한 전체 설명을 여기에서 찾을 수 있습니다.

저는 항상 파이썬에서 병렬 처리를 처리하기 위해 '멀티프로세싱' 네이티브 라이브러리를 사용합니다.대기열의 프로세스 수를 제어하기 위해 공유 변수를 카운터로 사용합니다.다음 예제에서는 단순 프로세스의 병렬 실행 방식을 확인할 수 있습니다.

저는 사용하기 쉽도록 스크립트를 업데이트했습니다.기본적으로, 당신이 해야 할 일은 단지 그것을 무시하는 것입니다.process병렬로 실행할 함수가 있는 메서드입니다.예제를 참조하십시오. 절차는 매우 간단합니다.또는 모든 실행 로그 발생을 제거할 수도 있습니다.

시간이 나면 값을 반환하는 프로세스로 작업할 수 있도록 코드를 업데이트하겠습니다.

요구 사항들

user@host:~$ pip install coloredlogs==15.0.1

코드

병렬 처리 스크립트(복사하여 붙여넣기):

#!/usr/bin/env python
# encoding: utf-8

from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os


LOG_LEVEL = "DEBUG"


def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
    assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")

    # Setting-up the script logging:
    logging.basicConfig(
        stream=sys.stdout,
        format="%(asctime)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


class ParallelProcessing:
    """
    Parallel processing.

    References
    ----------
    [1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419

    Examples
    --------
    >>> class MyParallelProcessing(ParallelProcessing):
    >>>     def process(self, name: str) -> None:
    >>>         logger = get_logger()
    >>>         logger.info(f"Executing process: {name}...")
    >>>         time.sleep(5)
    >>>
    >>>
    >>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
    >>> mpp = MyParallelProcessing()
    >>> mpp.run(args_list=params_list)
    """

    _n_jobs: int
    _waiting_time: int
    _queue: Value
    _logger: Logger

    def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
        """
        Instantiates a parallel processing object to execute processes in parallel.

        Parameters
        ----------
        n_jobs: int
            Number of jobs.
        waiting_time: int
            Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
        """
        self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
        self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
        self._logger = get_logger()

    def process(self, *args) -> None:
        """
        Abstract process that must be overridden.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        raise NotImplementedError("Process not defined ('NotImplementedError' exception).")

    def _execute(self, *args) -> None:
        """
        Run the process and remove it from the process queue by decreasing the queue process counter.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        self.process(*args)
        self._queue.value -= 1

    def _error_callback(self, result: Any) -> None:
        """
        Error callback.

        Parameters
        ----------
        result: Any
            Result from exceptions.
        """
        self._logger.error(result)
        os._exit(1)

    def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
        """
        Run processes in parallel.

        Parameters
        ----------
        args_list: Iterator[tuple]
            List of process parameters (`*args`).
        use_multithreading: bool
            Use multithreading instead multiprocessing.
        """
        manager = Manager()
        self._queue = manager.Value('i', 0)
        lock = manager.Lock()
        pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)

        start_time = datetime.now()

        with lock:  # Write-protecting the processes queue shared variable.
            for args in args_list:
                while True:
                    if self._queue.value < self._n_jobs:
                        self._queue.value += 1

                        # Running processes in parallel:
                        pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)

                        break
                    else:
                        self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
                        time.sleep(self._waiting_time)

        pool.close()
        pool.join()

        exec_time = datetime.now() - start_time
        self._logger.info(f"Execution time: {exec_time}")

사용 예:

class MyParallelProcessing(ParallelProcessing):
    def process(self, name: str) -> None:
        """
        Process to run in parallel (overrides abstract method).
        """
        logger = get_logger()
        logger.info(f"Executing process: {name}...")
        time.sleep(5)


def main() -> None:
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
    params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]

    mpp = MyParallelProcessing(n_jobs=n_jobs)

    # Executing processes in parallel:
    mpp.run(args_list=params_list)


if __name__ == '__main__':
    main()

실행 및 출력

user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

다른 답변에서 권장하는 라이브러리 또는 모듈의 요구 사항과 가정을 학습하는 데 시간을 투자할 수 없는 경우 다음 사항이 적합할 수 있습니다.

  1. 태스크의 개별 부분을 실행하기 위한 스크립트 옵션을 지정합니다.
  2. 실행할 준비가 되었을 때n부품을 병렬로 실행합니다.child = subprocess.Popen(args = [sys.argv[0], ...])추가 옵션 및/또는 매개 변수 파일에 부품 번호 및 기타 세부 정보 제공 및 통화child.wait()어린이 한 명당

상황을 더 하십시오.child.poll()child.wait().child.returncode 직도입니다.None.

큰 작업의 경우 새 프로세스를 시작하고 파일을 쓰고 읽는 데 드는 오버헤드가 최소화됩니다.많은 소규모 작업의 경우 작업자를 한 번만 시작한 다음 파이프나 소켓을 통해 작업자와 통신하기를 원하지만, 이 작업은 훨씬 더 많은 작업이며 교착 상태의 가능성을 피하기 위해 신중하게 수행되어야 합니다.이러한 상황에서는 다른 답변에서 권장하는 모듈을 사용하는 방법을 배우는 것이 좋습니다.

다음은 윈도우즈 환경에서 작동하는 완전한 예입니다. 비동기식 처리의 장점은 시간을 절약하는 것입니다.

import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool


def f1(a):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1


def f2(b):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1

if __name__ == '__main__':

    pool = Pool(multiprocessing.cpu_count())
    result1 = pool.apply_async(f1, [0])
    result2 = pool.apply_async(f2, [9])
    freeze_support()
    t0 = time.time()
    answer1 = result1.get(timeout=10)
    answer2 = result2.get(timeout=10)
    print(time.time()-t0)
    t0 = time.time()
    aa = f1(1)
    bb = f2(2)
    print(time.time()-t0)

데이터 프레임을 Dask Dataframe으로 변환하면 병렬 컴퓨팅을 처리할 수 있습니다.

import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)

언급URL : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python

반응형