Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
11 : #define BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/format.hpp>
16 : #include <boost/beast2/read.hpp>
17 : #include <boost/beast2/write.hpp>
18 : #include <boost/beast2/server/any_lambda.hpp>
19 : #include <boost/beast2/server/route_handler_asio.hpp>
20 : #include <boost/beast2/server/router_asio.hpp>
21 : #include <boost/beast2/error.hpp>
22 : #include <boost/beast2/detail/except.hpp>
23 : #include <boost/capy/application.hpp>
24 : #include <boost/http_proto/request_parser.hpp>
25 : #include <boost/http_proto/response.hpp>
26 : #include <boost/http_proto/serializer.hpp>
27 : #include <boost/http_proto/string_body.hpp>
28 : #include <boost/http_proto/server/basic_router.hpp>
29 : #include <boost/url/parse.hpp>
30 : #include <boost/asio/prepend.hpp>
31 :
32 : namespace boost {
33 : namespace beast2 {
34 :
35 : //------------------------------------------------
36 :
37 : /** An HTTP server stream which routes requests to handlers and sends responses.
38 :
39 : An object of this type wraps an asynchronous Boost.ASIO stream and implements
40 : a high level server connection which reads HTTP requests, routes them to
41 : handlers installed in a router, and sends the HTTP response.
42 :
43 : @par Requires
44 : `AsyncStream` must satisfy <em>AsyncReadStream</em> and <em>AsyncWriteStream</em>
45 :
46 : @tparam AsyncStream The type of asynchronous stream.
47 : */
48 : template<class AsyncStream>
49 : class http_stream
50 : : private http_proto::detacher::owner
51 : {
52 : public:
53 : /** Constructor.
54 :
55 : This initializes a new HTTP connection object that operates on
56 : the given stream, uses the specified router to dispatch incoming
57 : requests, and calls the supplied completion function when the
58 : connection closes or fails.
59 :
60 : Construction does not start any I/O; call @ref on_stream_begin when
61 : the stream is connected to the remote peer to begin reading
62 : requests and processing them.
63 :
64 : @param app The owning application, used to access shared services
65 : such as logging and protocol objects.
66 : @param stream The underlying asynchronous stream to read from
67 : and write to. The caller is responsible for maintaining its
68 : lifetime for the duration of the session.
69 : @param routes The router used to dispatch incoming HTTP requests.
70 : @param close_fn The function invoked when the connection is closed
71 : or an unrecoverable error occurs.
72 : */
73 : http_stream(
74 : capy::application& app,
75 : AsyncStream& stream,
76 : router_asio<AsyncStream&> routes,
77 : any_lambda<void(system::error_code)> close_fn);
78 :
79 : /** Called to start a new HTTP session
80 :
81 : The stream must be in a connected,
82 : correct state for a new session.
83 : */
84 : void on_stream_begin(http_proto::acceptor_config const& config);
85 :
86 : private:
87 : void do_read();
88 : void on_read(
89 : system::error_code ec,
90 : std::size_t bytes_transferred);
91 : void on_headers();
92 : void do_dispatch(http_proto::route_result rv = {});
93 : void do_respond(http_proto::route_result rv);
94 : void do_write();
95 : void on_write(
96 : system::error_code const& ec,
97 : std::size_t bytes_transferred);
98 : void on_complete();
99 : http_proto::resumer do_detach() override;
100 : void do_resume(http_proto::route_result const& ec) override;
101 : void do_close();
102 : void do_fail(core::string_view s,
103 : system::error_code const& ec);
104 : void clear() noexcept;
105 :
106 : protected:
107 0 : std::string id() const
108 : {
109 0 : return std::string("[") + std::to_string(id_) + "] ";
110 : }
111 :
112 : protected:
113 : struct resetter;
114 : section sect_;
115 : std::size_t id_ = 0;
116 : AsyncStream& stream_;
117 : router_asio<AsyncStream&> routes_;
118 : any_lambda<void(system::error_code)> close_;
119 : http_proto::acceptor_config const* pconfig_ = nullptr;
120 :
121 : using work_guard = asio::executor_work_guard<decltype(
122 : std::declval<AsyncStream&>().get_executor())>;
123 : std::unique_ptr<work_guard> pwg_;
124 : http_proto::Request req_;
125 : ResponseAsio<AsyncStream&> res_;
126 : };
127 :
128 : //------------------------------------------------
129 :
130 : // for exception safety
131 : template<class AsyncStream>
132 : struct http_stream<AsyncStream>::
133 : resetter
134 : {
135 : ~resetter()
136 : {
137 : if(clear_)
138 : owner_.clear();
139 : }
140 :
141 : explicit resetter(
142 : http_stream<AsyncStream>& owner) noexcept
143 : : owner_(owner)
144 : {
145 : }
146 :
147 : void accept()
148 : {
149 : clear_ = false;
150 : }
151 :
152 : private:
153 : http_stream<AsyncStream>& owner_;
154 : bool clear_ = true;
155 : };
156 :
157 : //------------------------------------------------
158 :
159 : template<class AsyncStream>
160 0 : http_stream<AsyncStream>::
161 : http_stream(
162 : capy::application& app,
163 : AsyncStream& stream,
164 : router_asio<AsyncStream&> routes,
165 : any_lambda<void(system::error_code)> close)
166 0 : : sect_(use_log_service(app).get_section("http_stream"))
167 0 : , id_(
168 0 : []() noexcept
169 : {
170 : static std::size_t n = 0;
171 0 : return ++n;
172 0 : }())
173 0 : , stream_(stream)
174 0 : , routes_(std::move(routes))
175 0 : , close_(close)
176 0 : , res_(stream_)
177 : {
178 0 : req_.parser = http_proto::request_parser(app);
179 :
180 0 : res_.serializer = http_proto::serializer(app);
181 0 : res_.detach = http_proto::detacher(*this);
182 0 : }
183 :
184 : // called to start a new HTTP session.
185 : // the connection must be in the correct state already.
186 : template<class AsyncStream>
187 : void
188 0 : http_stream<AsyncStream>::
189 : on_stream_begin(
190 : http_proto::acceptor_config const& config)
191 : {
192 0 : pconfig_ = &config;
193 :
194 0 : req_.parser.reset();
195 0 : res_.data.clear();
196 0 : do_read();
197 0 : }
198 :
199 : // begin reading the request
200 : template<class AsyncStream>
201 : void
202 0 : http_stream<AsyncStream>::
203 : do_read()
204 : {
205 0 : req_.parser.start();
206 :
207 0 : beast2::async_read(stream_, req_.parser,
208 0 : call_mf(&http_stream::on_read, this));
209 0 : }
210 :
211 : // called when the read operation completes
212 : template<class AsyncStream>
213 : void
214 0 : http_stream<AsyncStream>::
215 : on_read(
216 : system::error_code ec,
217 : std::size_t bytes_transferred)
218 : {
219 : (void)bytes_transferred;
220 :
221 0 : if(ec.failed())
222 0 : return do_fail("http_stream::on_read", ec);
223 :
224 0 : LOG_TRC(this->sect_)(
225 : "{} http_stream::on_read bytes={}",
226 : this->id(), bytes_transferred);
227 :
228 0 : BOOST_ASSERT(req_.parser.is_complete());
229 :
230 0 : on_headers();
231 : }
232 :
233 : // called to set up the response after reading the request
234 : template<class AsyncStream>
235 : void
236 0 : http_stream<AsyncStream>::
237 : on_headers()
238 : {
239 : // set up Request and Response objects
240 0 : res_.serializer.reset();
241 : // VFALCO HACK for now we make a copy of the message
242 0 : req_.message = req_.parser.get();
243 : //res_.message.set_version(req_.message.version());
244 0 : res_.message.set_start_line( // VFALCO WTF
245 : http_proto::status::ok, req_.message.version());
246 0 : res_.message.set_keep_alive(req_.message.keep_alive());
247 0 : res_.data.clear();
248 :
249 : // parse the URL
250 : {
251 0 : auto rv = urls::parse_uri_reference(req_.message.target());
252 0 : if(rv.has_error())
253 : {
254 : // error parsing URL
255 0 : res_.status(http_proto::status::bad_request);
256 0 : res_.set_body("Bad Request: " + rv.error().message());
257 0 : return do_respond(rv.error());
258 : }
259 :
260 0 : req_.url = rv.value();
261 : }
262 :
263 : // invoke handlers for the route
264 0 : do_dispatch();
265 : }
266 :
267 : // called to dispatch or resume the route
268 : template<class AsyncStream>
269 : void
270 0 : http_stream<AsyncStream>::
271 : do_dispatch(
272 : http_proto::route_result rv)
273 : {
274 0 : if(! rv.failed())
275 : {
276 0 : BOOST_ASSERT(! pwg_); // can't be detached
277 0 : rv = routes_.dispatch(
278 0 : req_.message.method(), req_.url, req_, res_);
279 : }
280 : else
281 : {
282 0 : rv = routes_.resume(req_, res_, rv);
283 : }
284 :
285 0 : do_respond(rv);
286 0 : }
287 :
288 : // called after obtaining a route result
289 : template<class AsyncStream>
290 : void
291 0 : http_stream<AsyncStream>::
292 : do_respond(
293 : http_proto::route_result rv)
294 : {
295 0 : BOOST_ASSERT(rv != http_proto::route::next_route);
296 :
297 0 : if(rv == http_proto::route::close)
298 : {
299 0 : return do_close();
300 : }
301 :
302 0 : if(rv == http_proto::route::complete)
303 : {
304 : // VFALCO what if the connection was closed or keep-alive=false?
305 : // handler sendt the response?
306 0 : BOOST_ASSERT(res_.serializer.is_done());
307 0 : return on_write(system::error_code(), 0);
308 : }
309 :
310 0 : if(rv == http_proto::route::detach)
311 : {
312 : // didn't call res.detach()?
313 0 : if(! pwg_)
314 0 : detail::throw_logic_error();
315 0 : return;
316 : }
317 :
318 0 : if(rv == http_proto::route::next)
319 : {
320 : // unhandled request
321 0 : auto const status = http_proto::status::not_found;
322 0 : res_.status(status);
323 : //res_.message.set_keep_alive(false); // VFALCO?
324 0 : res_.set_body(http_proto::to_string(status));
325 : }
326 0 : else if(rv != http_proto::route::send)
327 : {
328 : // error message of last resort
329 0 : BOOST_ASSERT(rv.failed());
330 0 : BOOST_ASSERT(! http_proto::is_route_result(rv));
331 0 : res_.status(http_proto::status::internal_server_error);
332 0 : std::string s;
333 0 : format_to(s, "An internal server error occurred: {}", rv.message());
334 0 : res_.message.set_keep_alive(false); // VFALCO?
335 0 : res_.set_body(s);
336 0 : }
337 :
338 0 : do_write();
339 : }
340 :
341 : // begin writing the response
342 : template<class AsyncStream>
343 : void
344 0 : http_stream<AsyncStream>::
345 : do_write()
346 : {
347 0 : BOOST_ASSERT(! res_.serializer.is_done());
348 0 : beast2::async_write(stream_, res_.serializer,
349 0 : call_mf(&http_stream::on_write, this));
350 0 : }
351 :
352 : // called when the write operation completes
353 : template<class AsyncStream>
354 : void
355 0 : http_stream<AsyncStream>::
356 : on_write(
357 : system::error_code const& ec,
358 : std::size_t bytes_transferred)
359 : {
360 : (void)bytes_transferred;
361 :
362 0 : if(ec.failed())
363 0 : return do_fail("http_stream::on_write", ec);
364 :
365 0 : BOOST_ASSERT(res_.serializer.is_done());
366 :
367 0 : LOG_TRC(this->sect_)(
368 : "{} http_stream::on_write bytes={}",
369 : this->id(), bytes_transferred);
370 :
371 0 : if(res_.message.keep_alive())
372 0 : return do_read();
373 :
374 0 : do_close();
375 : }
376 :
377 : template<class AsyncStream>
378 : auto
379 0 : http_stream<AsyncStream>::
380 : do_detach() ->
381 : http_proto::resumer
382 : {
383 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
384 :
385 : // can't call twice
386 0 : BOOST_ASSERT(! pwg_);
387 0 : pwg_.reset(new work_guard(stream_.get_executor()));
388 :
389 : // VFALCO cancel timer
390 :
391 0 : return http_proto::resumer(*this);
392 : }
393 :
394 : // called by resume(rv)
395 : template<class AsyncStream>
396 : void
397 0 : http_stream<AsyncStream>::
398 : do_resume(
399 : http_proto::route_result const& rv)
400 : {
401 0 : asio::dispatch(
402 0 : stream_.get_executor(),
403 0 : [this, rv]
404 : {
405 0 : BOOST_ASSERT(pwg_.get() != nullptr);
406 0 : pwg_.reset();
407 :
408 0 : do_dispatch(rv);
409 : });
410 0 : }
411 :
412 : // called when a non-recoverable error occurs
413 : template<class AsyncStream>
414 : void
415 0 : http_stream<AsyncStream>::
416 : do_fail(
417 : core::string_view s, system::error_code const& ec)
418 : {
419 0 : LOG_TRC(this->sect_)("{}: {}", s, ec.message());
420 :
421 : // tidy up lingering objects
422 0 : req_.parser.reset();
423 0 : res_.serializer.reset();
424 :
425 0 : close_(ec);
426 0 : }
427 :
428 : // end the session
429 : template<class AsyncStream>
430 : void
431 0 : http_stream<AsyncStream>::
432 : do_close()
433 : {
434 0 : clear();
435 0 : close_({});
436 0 : }
437 :
438 : // clear everything, releasing transient objects
439 : template<class AsyncStream>
440 : void
441 0 : http_stream<AsyncStream>::
442 : clear() noexcept
443 : {
444 0 : req_.parser.reset();
445 0 : res_.serializer.reset();
446 0 : res_.message.clear();
447 0 : }
448 :
449 : } // beast2
450 : } // boost
451 :
452 : #endif
|