module Stream:sig
..end
Stream handles provide an abstraction of a duplex communication
channel. Stream.t
is an abstract type, libuv provides 3 stream
implementations in the form of Tcp.t
, Pipe.t
and Tty.t
.
type
t
include Uwt.Handle
val to_handle : t -> Uwt.Handle.t
val is_readable : t -> bool
val is_writable : t -> bool
val read_start : t -> cb:(Bytes.t uv_result -> unit) -> Int_result.unit
Read data from an incoming stream. The ~cb
will
be made several times until there is no more data to read or
Uwt.Stream.read_stop
is called.
val read_start_exn : t -> cb:(Bytes.t uv_result -> unit) -> unit
val read_stop : t -> Int_result.unit
Stop reading data from the stream.
val read_stop_exn : t -> unit
val read : ?pos:int -> ?len:int -> t -> buf:bytes -> int Lwt.t
There is currently no uv_read
function in libuv, just uv_read_start
and uv_read_stop
. This is a wrapper for your convenience. It calls
read_stop internally, if you don't continue with reading
immediately. Zero result indicates EOF.
In future libuv versions, there might be uv_read
and
uv_try_read
functions (it was discussed several times).
If these changes got merged, Uwt.Stream.read
will wrap them - even
if there will be small semantic differences.
It is currently not possible to start several read threads
in parallel, you must serialize the requests manually. In the
following example t2
will fail with EBUSY:
let t1 = Uwt.Stream.read t ~buf:buf1 in
let t2 = Uwt.Stream.read t ~buf:buf2 in
(* ... *)
Calling the function with ~len:0
has a dubious, system dependent
semantic.
val read_ba : ?pos:int -> ?len:int -> t -> buf:buf -> int Lwt.t
val write_queue_size : t -> int
Returns the amount of queued bytes waiting to be sent
val try_write : ?pos:int -> ?len:int -> t -> buf:bytes -> Int_result.int
Write data to stream, but won't queue a write request if it can't be completed immediately.
val try_write_ba : ?pos:int -> ?len:int -> t -> buf:buf -> Int_result.int
val try_write_string : ?pos:int -> ?len:int -> t -> buf:string -> Int_result.int
val write : ?pos:int -> ?len:int -> t -> buf:bytes -> unit Lwt.t
Write data to stream
val write_string : ?pos:int -> ?len:int -> t -> buf:string -> unit Lwt.t
val write_ba : ?pos:int -> ?len:int -> t -> buf:buf -> unit Lwt.t
val write_raw : ?pos:int -> ?len:int -> t -> buf:bytes -> unit Lwt.t
Uwt.Stream.write
is eager - like the counterparts inside Lwt_unix
. It
first calls Uwt.Stream.try_write
internally to check if it can return
immediately (without the overhead of creating a sleeping thread
and waking it up later). If it can't write everything instantly,
it will call Uwt.Stream.write_raw
internally. Uwt.Stream.write_raw
is exposed
here mainly in order to write unit tests for it. But you can also
use it, if you your ~buf
is very large or you know for another
reason, that try_write will fail.
val write_raw_string : ?pos:int -> ?len:int -> t -> buf:string -> unit Lwt.t
val write_raw_ba : ?pos:int -> ?len:int -> t -> buf:buf -> unit Lwt.t
val try_writev : t -> Iovec_write.t list -> Int_result.int
Windows doesn't support writing multiple buffers with a single syscall for some HANDLEs (e.g. it's supported for tcp handles, but not pipes). uwt then writes the buffers one by one
If the number of buffers is greater than IOV_MAX, libuv already contains the necessary workarounds
val writev : t -> Iovec_write.t list -> unit Lwt.t
See comment to ! This function will fail with
Unix.EOPNOTSUPP
on Windows for e.g. pipe handles
val writev_emul : t -> Iovec_write.t list -> unit Lwt.t
Similar to Uwt.Stream.writev
, but if passing several buffers at
once is not supported by the OS, the buffers will be written
one by one. Please note that as a consequence you should not start several
Uwt.Stream.writev_emul
threads in parallel. The writing order would be surprising
in this case. If you don't use windows, this function is identic to
Uwt.Stream.writev
val writev_raw : t -> Iovec_write.t list -> unit Lwt.t
val listen : t ->
max:int -> cb:(t -> Int_result.unit -> unit) -> Int_result.unit
Start listening for incoming connections. ~max
indicates the
number of connections the kernel might queue, same as
listen(2)
. When a new incoming connection is received ~cb
is
called.
val listen_exn : t ->
max:int -> cb:(t -> Int_result.unit -> unit) -> unit
val shutdown : t -> unit Lwt.t
Shutdown the outgoing (write) side of a duplex stream. It waits for pending write requests to complete.
val set_blocking : t -> bool -> Int_result.unit
Just don't use this function. It will only cause trouble.