Timeout Function in Python 3

Have you ever implemented a function that has to stop its execution after certain period of time? It is not as easy as it sounds, is it? I had to develop such functionality for a customer who had a requirement that one activity shouldn’t take more than 180 seconds to execute. There are two approaches […]

by Veselin Pavlov

November 15, 2016

4 min read

timeout - Timeout Function in Python 3

Have you ever implemented a function that has to stop its execution after certain period of time? It is not as easy as it sounds, is it? I had to develop such functionality for a customer who had a requirement that one activity shouldn’t take more than 180 seconds to execute. There are two approaches to achieve this behavior. The first one is to use threads and the second one, to use processes. The second one is better in my opinion but let me first start with the threads.

Implementing timeout function with thread:

In order to implement the timeout function, we need one thread to execute the function and another to watch the time that it takes. When the time is over only the second thread knows it. If we could simply kill the function thread everything would work as expected but since they share the same execution context, we can’t. Threads can’t be killed so what we can do is to signal the other thread that it should stop. The drawback of this approach is that it can’t be used in all the cases. For example if we are using an external library inside that function, the execution might be stuck in a code that we don’t have access to. In this case we can’t guarantee that the function will stop exactly after the given period. But in most of the cases this approach is enough. In the first thread (the one that executes the function) we have to make regular checks if the time is over. We can use the Event object from the threading module in Python 3 to send a signal from one thread to another. Here is an example:

Example:

from threading import Thread, Event
import time


# Event object used to send signals from one thread to another
stop_event = Event()


def do_actions():
    """
    Function that should timeout after 5 seconds. It simply prints a number and waits 1 second.
    :return:
    """
    i = 0
    while True:
        i += 1
        print(i)
        time.sleep(1)

        # Here we make the check if the other thread sent a signal to stop execution.
        if stop_event.is_set():
            break


if __name__ == '__main__':
    # We create another Thread
    action_thread = Thread(target=do_actions)

    # Here we start the thread and we wait 5 seconds before the code continues to execute.
    action_thread.start()
    action_thread.join(timeout=5)

    # We send a signal that the other thread should stop.
    stop_event.set()

    print("Hey there! I timed out! You can do things after me!")

Result:
1
2
3
4
5
Hey there! I timed out! You can do things after me!

In this example the main thread waits 5 seconds before it sends a stop_event signal. This is implemented with the join method which purpose is to block the calling thread until the thread whose join() method is called is terminated or the period set in the timeout parameter is over. After the blocking goes off, the main thread sends the stop signal and the other thread is supposed to see it and stop. If we have a loop with many fast actions, this approach is appropriate but if we call some functions from external modules we can’t guarantee that after the 5 seconds, the thread will be able to see that it should stop. Fortunately Python provides the multiprocessing module, which allows us to create processes which can be killed.

Implementing timeout function with Process:

As I said earlier threads can’t be killed because they have shared memory and some resources like files, database connections might be left unreleased if we kill the threads forcefully. This is not true for processes. Each process has it’s own memory space. This allows us to kill it without worrying that it might leave some open resource. Below is the same example implemented with process:

Example:

from multiprocessing import Process
import time

def do_actions():
    """
    Function that should timeout after 5 seconds. It simply prints a number and waits 1 second.
    :return:
    """
    i = 0
    while True:
        i += 1
        print(i)
        time.sleep(1)

if __name__ == '__main__':
    # We create a Process
    action_process = Process(target=do_actions)

    # We start the process and we block for 5 seconds.
    action_process.start()
    action_process.join(timeout=5)

    # We terminate the process.
    action_process.terminate()
    print("Hey there! I timed out! You can do things after me!")

Result:
1
2
3
4
5
Hey there! I timed out! You can do things after me!

As you can see from the code we use the terminate function of the process to stop it. Keep in mind that the termination process might take some time so in both cases we can’t guarantee that it will take exactly 5 seconds to finish but at least with processes it will be closer.

Do you know another way to implement this behavior?

Dev Manager and Partner at Dreamix