From 5893be77c39ef5213ec716f71e761174cc02c648 Mon Sep 17 00:00:00 2001 From: Vaxry Date: Tue, 15 Apr 2025 16:55:21 +0100 Subject: [PATCH] proxy: add experimental async mode --- README.md | 1 + example/config.jsonc | 10 ++++++- src/config/Config.hpp | 4 +++ src/core/Handler.cpp | 61 +++++++++++++++++++++++++++++++------------ src/core/Handler.hpp | 21 ++++++++++++--- src/main.cpp | 2 -- 6 files changed, 76 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index e48835b..40c975f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ AI scrapers are everywhere. This will stop them. `robots.txt` won't. - Easy configuration in jsonc - Support for cloudflare - Support for IP-Range based rules (both ipv4 and ipv6) +- Support for async (multithreaded) request handling - Minimal. The waiting page is tiny and light on network usage. ### Planned features diff --git a/example/config.jsonc b/example/config.jsonc index 0cee3fd..51f538e 100644 --- a/example/config.jsonc +++ b/example/config.jsonc @@ -38,5 +38,13 @@ "exclude_regex": ".*/commit/.*", "action_on_exclude": "DENY" } - ] + ], + + // experimental settings. Use with caution! + "experimental": { + // If enabled, all requests to the proxy will create their own thread. + // this can increase the throughput of the proxy, especially when there are a lot of requests made + // all at once. + "async_proxy": false + } } \ No newline at end of file diff --git a/src/config/Config.hpp b/src/config/Config.hpp index e45778a..bb12083 100644 --- a/src/config/Config.hpp +++ b/src/config/Config.hpp @@ -45,6 +45,10 @@ class CConfig { bool trace_logging = false; std::vector ip_configs; int default_challenge_difficulty = 4; + + struct { + bool async_proxy = false; + } experimental; } m_config; struct { diff --git a/src/core/Handler.cpp b/src/core/Handler.cpp index aebd212..1cedb60 100644 --- a/src/core/Handler.cpp +++ b/src/core/Handler.cpp @@ -54,19 +54,6 @@ static std::string generateToken() { return ss.str(); } -void CServerHandler::init() { - m_client = new Pistache::Http::Experimental::Client(); - m_client->init(Pistache::Http::Experimental::Client::options().maxConnectionsPerHost(32).maxResponseSize(g_pConfig->m_config.max_request_size).threads(4)); -} - -void CServerHandler::finish() { - if (!m_client) - return; - m_client->shutdown(); - delete m_client; - m_client = nullptr; -} - std::string CServerHandler::fingerprintForRequest(const Pistache::Http::Request& req) { const auto HEADERS = req.headers(); std::shared_ptr acceptEncodingHeader; @@ -373,11 +360,47 @@ void CServerHandler::serveStop(const Pistache::Http::Request& req, Pistache::Htt } void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response) { + if (g_pConfig->m_config.experimental.async_proxy) { + proxyPassAsync(req, response); + return; + } + + proxyPassInternal(req, response); +} + +void CServerHandler::proxyPassAsync(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response) { + std::shared_ptr proxiedRequest; + { + std::lock_guard lg(*m_asyncProxyQueue.queueMutex); + proxiedRequest = m_asyncProxyQueue.queue.emplace_back(std::make_shared(req, response)); + Debug::log(TRACE, "proxyPassAsync: new request, queue size {}", m_asyncProxyQueue.queue.size()); + } + + if (!proxiedRequest) { + Debug::log(ERR, "Couldn't create an async proxy request struct?"); + response.send(Pistache::Http::Code::Internal_Server_Error, "Internal Proxy Error"); + return; + } + + // TODO: add an option to limit the amount of threads (akin to a Java ThreadPool iirc it was called) + proxiedRequest->requestThread = std::thread([proxiedRequest, this]() { + proxyPassInternal(proxiedRequest->req, proxiedRequest->response, true); + std::lock_guard lg(*m_asyncProxyQueue.queueMutex); + std::erase(m_asyncProxyQueue.queue, proxiedRequest); + Debug::log(TRACE, "proxyPassAsync: request done, queue size {}", m_asyncProxyQueue.queue.size()); + }); + proxiedRequest->requestThread.detach(); +} + +void CServerHandler::proxyPassInternal(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, bool async) { const std::string FORWARD_ADDR = g_pConfig->m_config.forward_address; Debug::log(TRACE, "Method ({}): Forwarding to {}", (uint32_t)req.method(), FORWARD_ADDR + req.resource()); - auto builder = m_client->prepareRequest(FORWARD_ADDR + req.resource(), req.method()); + Pistache::Http::Experimental::Client client; + client.init(Pistache::Http::Experimental::Client::options().maxConnectionsPerHost(32).maxResponseSize(g_pConfig->m_config.max_request_size).threads(4)); + + auto builder = client.prepareRequest(FORWARD_ADDR + req.resource(), req.method()); builder.body(req.body()); for (auto it = req.cookies().begin(); it != req.cookies().end(); ++it) { builder.cookie(*it); @@ -391,9 +414,13 @@ void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Htt continue; } + // FIXME: this should be possible once pistache allows for live reading of T-E? + // for now, we wait for everything forever... + // same as the todo further below, essentially if (HNAME == "Accept" && req.headers().getRaw(h->name()).value().contains("text/event-stream")) { - Debug::log(ERR, "FIXME: text/event-stream not supported via the proxy (it would have to be async)"); - response.send(Pistache::Http::Code::Internal_Server_Error, "event-stream is not supported by checkpoint"); + Debug::log(TRACE, "FIXME: proxyPassInternal: text/event-stream not supported"); + response.send(Pistache::Http::Code::Internal_Server_Error, "Internal server error"); + client.shutdown(); return; } @@ -446,4 +473,6 @@ void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Htt }); Pistache::Async::Barrier b(resp); b.wait_for(std::chrono::seconds(g_pConfig->m_config.proxy_timeout_sec)); + + client.shutdown(); } \ No newline at end of file diff --git a/src/core/Handler.hpp b/src/core/Handler.hpp index 46cc2c4..ff6b71b 100644 --- a/src/core/Handler.hpp +++ b/src/core/Handler.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include // Giga hack, but we need it cuz the API is quite awkward and incomplete #define private public @@ -11,9 +12,6 @@ class CServerHandler : public Pistache::Http::Handler { HTTP_PROTOTYPE(CServerHandler) - void init(); - void finish(); - void onRequest(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter response); void onTimeout(const Pistache::Http::Request& request, Pistache::Http::ResponseWriter response); @@ -21,6 +19,8 @@ class CServerHandler : public Pistache::Http::Handler { private: void serveStop(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, int difficulty); void proxyPass(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response); + void proxyPassInternal(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, bool async = false); + void proxyPassAsync(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response); void challengeSubmitted(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response); std::string fingerprintForRequest(const Pistache::Http::Request& req); std::string ipForRequest(const Pistache::Http::Request& req); @@ -38,5 +38,18 @@ class CServerHandler : public Pistache::Http::Handler { std::string error = ""; }; - Pistache::Http::Experimental::Client* m_client = nullptr; + struct SProxiedRequest { + SProxiedRequest(const Pistache::Http::Request& r, Pistache::Http::ResponseWriter& resp) : req(r), response(std::move(resp)) { + ; + } + + Pistache::Http::Request req; + Pistache::Http::ResponseWriter response; + std::thread requestThread; + }; + + struct { + std::vector> queue; + std::shared_ptr queueMutex = std::make_shared(); + } m_asyncProxyQueue; }; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 492d067..1270f18 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -88,7 +88,6 @@ int main(int argc, char** argv, char** envp) { opts.maxRequestSize(g_pConfig->m_config.max_request_size); endpoint->init(opts); auto handler = Pistache::Http::make_handler(); - handler->init(); endpoint->setHandler(handler); endpoint->serveThreaded(); @@ -117,7 +116,6 @@ int main(int argc, char** argv, char** envp) { Debug::log(LOG, "Shutting down, bye!"); - handler->finish(); endpoint->shutdown(); endpoint = nullptr;