proxy: add experimental async mode
This commit is contained in:
@@ -13,6 +13,7 @@ AI scrapers are everywhere. This will stop them. `robots.txt` won't.
|
||||
- Easy configuration in jsonc
|
||||
- Support for cloudflare
|
||||
- Support for IP-Range based rules (both ipv4 and ipv6)
|
||||
- Support for async (multithreaded) request handling
|
||||
- Minimal. The waiting page is tiny and light on network usage.
|
||||
|
||||
### Planned features
|
||||
|
||||
@@ -38,5 +38,13 @@
|
||||
"exclude_regex": ".*/commit/.*",
|
||||
"action_on_exclude": "DENY"
|
||||
}
|
||||
]
|
||||
],
|
||||
|
||||
// experimental settings. Use with caution!
|
||||
"experimental": {
|
||||
// If enabled, all requests to the proxy will create their own thread.
|
||||
// this can increase the throughput of the proxy, especially when there are a lot of requests made
|
||||
// all at once.
|
||||
"async_proxy": false
|
||||
}
|
||||
}
|
||||
@@ -45,6 +45,10 @@ class CConfig {
|
||||
bool trace_logging = false;
|
||||
std::vector<SIPRangeConfig> ip_configs;
|
||||
int default_challenge_difficulty = 4;
|
||||
|
||||
struct {
|
||||
bool async_proxy = false;
|
||||
} experimental;
|
||||
} m_config;
|
||||
|
||||
struct {
|
||||
|
||||
@@ -54,19 +54,6 @@ static std::string generateToken() {
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void CServerHandler::init() {
|
||||
m_client = new Pistache::Http::Experimental::Client();
|
||||
m_client->init(Pistache::Http::Experimental::Client::options().maxConnectionsPerHost(32).maxResponseSize(g_pConfig->m_config.max_request_size).threads(4));
|
||||
}
|
||||
|
||||
void CServerHandler::finish() {
|
||||
if (!m_client)
|
||||
return;
|
||||
m_client->shutdown();
|
||||
delete m_client;
|
||||
m_client = nullptr;
|
||||
}
|
||||
|
||||
std::string CServerHandler::fingerprintForRequest(const Pistache::Http::Request& req) {
|
||||
const auto HEADERS = req.headers();
|
||||
std::shared_ptr<const Pistache::Http::Header::AcceptEncoding> acceptEncodingHeader;
|
||||
@@ -373,11 +360,47 @@ void CServerHandler::serveStop(const Pistache::Http::Request& req, Pistache::Htt
|
||||
}
|
||||
|
||||
void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response) {
|
||||
if (g_pConfig->m_config.experimental.async_proxy) {
|
||||
proxyPassAsync(req, response);
|
||||
return;
|
||||
}
|
||||
|
||||
proxyPassInternal(req, response);
|
||||
}
|
||||
|
||||
void CServerHandler::proxyPassAsync(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response) {
|
||||
std::shared_ptr<SProxiedRequest> proxiedRequest;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(*m_asyncProxyQueue.queueMutex);
|
||||
proxiedRequest = m_asyncProxyQueue.queue.emplace_back(std::make_shared<SProxiedRequest>(req, response));
|
||||
Debug::log(TRACE, "proxyPassAsync: new request, queue size {}", m_asyncProxyQueue.queue.size());
|
||||
}
|
||||
|
||||
if (!proxiedRequest) {
|
||||
Debug::log(ERR, "Couldn't create an async proxy request struct?");
|
||||
response.send(Pistache::Http::Code::Internal_Server_Error, "Internal Proxy Error");
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: add an option to limit the amount of threads (akin to a Java ThreadPool iirc it was called)
|
||||
proxiedRequest->requestThread = std::thread([proxiedRequest, this]() {
|
||||
proxyPassInternal(proxiedRequest->req, proxiedRequest->response, true);
|
||||
std::lock_guard<std::mutex> lg(*m_asyncProxyQueue.queueMutex);
|
||||
std::erase(m_asyncProxyQueue.queue, proxiedRequest);
|
||||
Debug::log(TRACE, "proxyPassAsync: request done, queue size {}", m_asyncProxyQueue.queue.size());
|
||||
});
|
||||
proxiedRequest->requestThread.detach();
|
||||
}
|
||||
|
||||
void CServerHandler::proxyPassInternal(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, bool async) {
|
||||
const std::string FORWARD_ADDR = g_pConfig->m_config.forward_address;
|
||||
|
||||
Debug::log(TRACE, "Method ({}): Forwarding to {}", (uint32_t)req.method(), FORWARD_ADDR + req.resource());
|
||||
|
||||
auto builder = m_client->prepareRequest(FORWARD_ADDR + req.resource(), req.method());
|
||||
Pistache::Http::Experimental::Client client;
|
||||
client.init(Pistache::Http::Experimental::Client::options().maxConnectionsPerHost(32).maxResponseSize(g_pConfig->m_config.max_request_size).threads(4));
|
||||
|
||||
auto builder = client.prepareRequest(FORWARD_ADDR + req.resource(), req.method());
|
||||
builder.body(req.body());
|
||||
for (auto it = req.cookies().begin(); it != req.cookies().end(); ++it) {
|
||||
builder.cookie(*it);
|
||||
@@ -391,9 +414,13 @@ void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Htt
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME: this should be possible once pistache allows for live reading of T-E?
|
||||
// for now, we wait for everything forever...
|
||||
// same as the todo further below, essentially
|
||||
if (HNAME == "Accept" && req.headers().getRaw(h->name()).value().contains("text/event-stream")) {
|
||||
Debug::log(ERR, "FIXME: text/event-stream not supported via the proxy (it would have to be async)");
|
||||
response.send(Pistache::Http::Code::Internal_Server_Error, "event-stream is not supported by checkpoint");
|
||||
Debug::log(TRACE, "FIXME: proxyPassInternal: text/event-stream not supported");
|
||||
response.send(Pistache::Http::Code::Internal_Server_Error, "Internal server error");
|
||||
client.shutdown();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -446,4 +473,6 @@ void CServerHandler::proxyPass(const Pistache::Http::Request& req, Pistache::Htt
|
||||
});
|
||||
Pistache::Async::Barrier<Pistache::Http::Response> b(resp);
|
||||
b.wait_for(std::chrono::seconds(g_pConfig->m_config.proxy_timeout_sec));
|
||||
|
||||
client.shutdown();
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <pistache/http.h>
|
||||
#include <mutex>
|
||||
|
||||
// Giga hack, but we need it cuz the API is quite awkward and incomplete
|
||||
#define private public
|
||||
@@ -11,9 +12,6 @@ class CServerHandler : public Pistache::Http::Handler {
|
||||
|
||||
HTTP_PROTOTYPE(CServerHandler)
|
||||
|
||||
void init();
|
||||
void finish();
|
||||
|
||||
void onRequest(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter response);
|
||||
|
||||
void onTimeout(const Pistache::Http::Request& request, Pistache::Http::ResponseWriter response);
|
||||
@@ -21,6 +19,8 @@ class CServerHandler : public Pistache::Http::Handler {
|
||||
private:
|
||||
void serveStop(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, int difficulty);
|
||||
void proxyPass(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response);
|
||||
void proxyPassInternal(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response, bool async = false);
|
||||
void proxyPassAsync(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response);
|
||||
void challengeSubmitted(const Pistache::Http::Request& req, Pistache::Http::ResponseWriter& response);
|
||||
std::string fingerprintForRequest(const Pistache::Http::Request& req);
|
||||
std::string ipForRequest(const Pistache::Http::Request& req);
|
||||
@@ -38,5 +38,18 @@ class CServerHandler : public Pistache::Http::Handler {
|
||||
std::string error = "";
|
||||
};
|
||||
|
||||
Pistache::Http::Experimental::Client* m_client = nullptr;
|
||||
struct SProxiedRequest {
|
||||
SProxiedRequest(const Pistache::Http::Request& r, Pistache::Http::ResponseWriter& resp) : req(r), response(std::move(resp)) {
|
||||
;
|
||||
}
|
||||
|
||||
Pistache::Http::Request req;
|
||||
Pistache::Http::ResponseWriter response;
|
||||
std::thread requestThread;
|
||||
};
|
||||
|
||||
struct {
|
||||
std::vector<std::shared_ptr<SProxiedRequest>> queue;
|
||||
std::shared_ptr<std::mutex> queueMutex = std::make_shared<std::mutex>();
|
||||
} m_asyncProxyQueue;
|
||||
};
|
||||
@@ -88,7 +88,6 @@ int main(int argc, char** argv, char** envp) {
|
||||
opts.maxRequestSize(g_pConfig->m_config.max_request_size);
|
||||
endpoint->init(opts);
|
||||
auto handler = Pistache::Http::make_handler<CServerHandler>();
|
||||
handler->init();
|
||||
endpoint->setHandler(handler);
|
||||
|
||||
endpoint->serveThreaded();
|
||||
@@ -117,7 +116,6 @@ int main(int argc, char** argv, char** envp) {
|
||||
|
||||
Debug::log(LOG, "Shutting down, bye!");
|
||||
|
||||
handler->finish();
|
||||
endpoint->shutdown();
|
||||
endpoint = nullptr;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user