%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2017-2022. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% -module(gen_tcp_dist). %% %% This is an example of how to plug in an arbitrary distribution %% carrier for Erlang using distribution processes. %% %% This example uses gen_tcp for transportation of data, but %% you can use whatever underlying protocol you want as long %% as your implementation reliably delivers data chunks to the %% receiving VM in the order they were sent from the sending %% VM. %% %% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl %% distribution implementation for TCP used by default. The default %% implementation uses distribution ports instead of distribution %% processes and is more efficient compared to this implementation. %% This example more or less gets the distribution processes %% in between the VM and the ports without any specific gain. %% -export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1, is_node_name/1, address/0]). %% Optional -export([setopts/2, getopts/2]). %% internal exports -export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3, dist_cntrlr_tick_handler/1]). -export([accept_loop/2,do_accept/6,do_setup/6]). -import(error_logger,[error_msg/2]). -include_lib("kernel/include/net_address.hrl"). -include_lib("kernel/include/dist.hrl"). -include_lib("kernel/include/dist_util.hrl"). %% ------------------------------------------------------------ %% Select this protocol based on node name %% select(Node) => Bool %% ------------------------------------------------------------ select(Node) -> case split_node(atom_to_list(Node), $@, []) of [_, Host] -> case inet:getaddr(Host, inet) of {ok,_} -> true; _ -> false end; _ -> false end. %% ------------------------------------------------------------ %% Get the address family that this distribution uses %% ------------------------------------------------------------ address() -> get_tcp_address(). %% ------------------------------------------------------------ %% Create the listen socket, i.e. the port that this erlang %% node is accessible through. %% ------------------------------------------------------------ listen(Name) -> case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of {ok, Socket} -> TcpAddress = get_tcp_address(Socket), {_,Port} = TcpAddress#net_address.address, ErlEpmd = net_kernel:epmd_module(), case ErlEpmd:register_node(Name, Port) of {ok, Creation} -> {ok, {Socket, TcpAddress, Creation}}; Error -> Error end; Error -> Error end. do_listen(Options) -> {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of {ok,N} when is_integer(N) -> case application:get_env(kernel, inet_dist_listen_max) of {ok,M} when is_integer(M) -> {N,M}; _ -> {N,N} end; _ -> {0,0} end, do_listen(First, Last, listen_options([{backlog,128}|Options])). do_listen(First,Last,_) when First > Last -> {error,eaddrinuse}; do_listen(First,Last,Options) -> case gen_tcp:listen(First, Options) of {error, eaddrinuse} -> do_listen(First+1,Last,Options); Other -> Other end. listen_options(Opts0) -> Opts1 = case application:get_env(kernel, inet_dist_use_interface) of {ok, Ip} -> [{ip, Ip} | Opts0]; _ -> Opts0 end, case application:get_env(kernel, inet_dist_listen_options) of {ok,ListenOpts} -> ListenOpts ++ Opts1; _ -> Opts1 end. %% ------------------------------------------------------------ %% Accepts new connection attempts from other Erlang nodes. %% ------------------------------------------------------------ accept(Listen) -> spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]). accept_loop(Kernel, Listen) -> ?trace("~p~n",[{?MODULE, accept_loop, self()}]), case gen_tcp:accept(Listen) of {ok, Socket} -> DistCtrl = spawn_dist_cntrlr(Socket), ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]), flush_controller(DistCtrl, Socket), gen_tcp:controlling_process(Socket, DistCtrl), flush_controller(DistCtrl, Socket), Kernel ! {accept,self(),DistCtrl,inet,tcp}, receive {Kernel, controller, Pid} -> call_ctrlr(DistCtrl, {supervisor, Pid}), Pid ! {self(), controller}; {Kernel, unsupported_protocol} -> exit(unsupported_protocol) end, accept_loop(Kernel, Listen); Error -> exit(Error) end. flush_controller(Pid, Socket) -> receive {tcp, Socket, Data} -> Pid ! {tcp, Socket, Data}, flush_controller(Pid, Socket); {tcp_closed, Socket} -> Pid ! {tcp_closed, Socket}, flush_controller(Pid, Socket) after 0 -> ok end. %% ------------------------------------------------------------ %% Accepts a new connection attempt from another Erlang node. %% Performs the handshake with the other side. %% ------------------------------------------------------------ accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> spawn_opt(?MODULE, do_accept, [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime], dist_util:net_ticker_spawn_options()). do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]), receive {AcceptPid, controller} -> Timer = dist_util:start_timer(SetupTime), case check_ip(DistCtrl) of true -> HSData0 = hs_data_common(DistCtrl), HSData = HSData0#hs_data{kernel_pid = Kernel, this_node = MyNode, socket = DistCtrl, timer = Timer, this_flags = 0, allowed = Allowed}, dist_util:handshake_other_started(HSData); {false,IP} -> error_msg("** Connection attempt from " "disallowed IP ~w ** ~n", [IP]), ?shutdown(no_node) end end. %% we may not always want the nodelay behaviour %% for performance reasons nodelay() -> case application:get_env(kernel, dist_nodelay) of undefined -> {nodelay, true}; {ok, true} -> {nodelay, true}; {ok, false} -> {nodelay, false}; _ -> {nodelay, true} end. %% ------------------------------------------------------------ %% Setup a new connection to another Erlang node. %% Performs the handshake with the other side. %% ------------------------------------------------------------ setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> spawn_opt(?MODULE, do_setup, [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], dist_util:net_ticker_spawn_options()). do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]), [Name, Address] = splitnode(Node, LongOrShortNames), case inet:getaddr(Address, inet) of {ok, Ip} -> Timer = dist_util:start_timer(SetupTime), ErlEpmd = net_kernel:epmd_module(), case ErlEpmd:port_please(Name, Ip) of {port, TcpPort, Version} -> ?trace("port_please(~p) -> version ~p~n", [Node,Version]), dist_util:reset_timer(Timer), case gen_tcp:connect( Ip, TcpPort, connect_options([binary, {active, false}, {packet, 2}])) of {ok, Socket} -> DistCtrl = spawn_dist_cntrlr(Socket), call_ctrlr(DistCtrl, {supervisor, self()}), flush_controller(DistCtrl, Socket), gen_tcp:controlling_process(Socket, DistCtrl), flush_controller(DistCtrl, Socket), HSData0 = hs_data_common(DistCtrl), HSData = HSData0#hs_data{kernel_pid = Kernel, other_node = Node, this_node = MyNode, socket = DistCtrl, timer = Timer, this_flags = 0, other_version = Version, request_type = Type}, dist_util:handshake_we_started(HSData); _ -> %% Other Node may have closed since %% port_please ! ?trace("other node (~p) " "closed since port_please.~n", [Node]), ?shutdown(Node) end; _ -> ?trace("port_please (~p) " "failed.~n", [Node]), ?shutdown(Node) end; _Other -> ?trace("inet_getaddr(~p) " "failed (~p).~n", [Node,_Other]), ?shutdown(Node) end. connect_options(Opts) -> case application:get_env(kernel, inet_dist_connect_options) of {ok,ConnectOpts} -> ConnectOpts ++ Opts; _ -> Opts end. %% %% Close a socket. %% close(Listen) -> gen_tcp:close(Listen). %% If Node is illegal terminate the connection setup!! splitnode(Node, LongOrShortNames) -> case split_node(atom_to_list(Node), $@, []) of [Name|Tail] when Tail =/= [] -> Host = lists:append(Tail), case split_node(Host, $., []) of [_] when LongOrShortNames =:= longnames -> case inet:parse_address(Host) of {ok, _} -> [Name, Host]; _ -> error_msg("** System running to use " "fully qualified " "hostnames **~n" "** Hostname ~ts is illegal **~n", [Host]), ?shutdown(Node) end; L when length(L) > 1, LongOrShortNames =:= shortnames -> error_msg("** System NOT running to use fully qualified " "hostnames **~n" "** Hostname ~ts is illegal **~n", [Host]), ?shutdown(Node); _ -> [Name, Host] end; [_] -> error_msg("** Nodename ~p illegal, no '@' character **~n", [Node]), ?shutdown(Node); _ -> error_msg("** Nodename ~p illegal **~n", [Node]), ?shutdown(Node) end. split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); split_node([], _, Ack) -> [lists:reverse(Ack)]. %% ------------------------------------------------------------ %% Fetch local information about a Socket. %% ------------------------------------------------------------ get_tcp_address(Socket) -> {ok, Address} = inet:sockname(Socket), NetAddr = get_tcp_address(), NetAddr#net_address{address = Address}. get_tcp_address() -> {ok, Host} = inet:gethostname(), #net_address { host = Host, protocol = tcp, family = inet }. %% ------------------------------------------------------------ %% Do only accept new connection attempts from nodes at our %% own LAN, if the check_ip environment parameter is true. %% ------------------------------------------------------------ check_ip(DistCtrl) -> case application:get_env(check_ip) of {ok, true} -> case get_ifs(DistCtrl) of {ok, IFs, IP} -> check_ip(IFs, IP); _ -> ?shutdown(no_node) end; _ -> true end. get_ifs(DistCtrl) -> Socket = call_ctrlr(DistCtrl, socket), case inet:peername(Socket) of {ok, {IP, _}} -> case inet:getif(Socket) of {ok, IFs} -> {ok, IFs, IP}; Error -> Error end; Error -> Error end. check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) -> case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of {M, M} -> true; _ -> check_ip(IFs, PeerIP) end; check_ip([], PeerIP) -> {false, PeerIP}. is_node_name(Node) when is_atom(Node) -> case split_node(atom_to_list(Node), $@, []) of [_, _Host] -> true; _ -> false end; is_node_name(_Node) -> false. hs_data_common(DistCtrl) -> TickHandler = call_ctrlr(DistCtrl, tick_handler), Socket = call_ctrlr(DistCtrl, socket), RejectFlags = case init:get_argument(gen_tcp_dist_reject_flags) of {ok,[[Flags]]} -> list_to_integer(Flags); _ -> #hs_data{}#hs_data.reject_flags end, #hs_data{f_send = send_fun(), f_recv = recv_fun(), f_setopts_pre_nodeup = setopts_pre_nodeup_fun(), f_setopts_post_nodeup = setopts_post_nodeup_fun(), f_getll = getll_fun(), f_handshake_complete = handshake_complete_fun(), f_address = address_fun(), mf_setopts = setopts_fun(DistCtrl, Socket), mf_getopts = getopts_fun(DistCtrl, Socket), mf_getstat = getstat_fun(DistCtrl, Socket), mf_tick = tick_fun(DistCtrl, TickHandler), reject_flags = RejectFlags}. %%% ------------------------------------------------------------ %%% Distribution controller processes %%% ------------------------------------------------------------ %% %% There will be five parties working together when the %% connection is up: %% - The gen_tcp socket. Providing a tcp/ip connection %% to the other node. %% - The output handler. It will dispatch all outgoing %% traffic from the VM to the gen_tcp socket. This %% process is registered as distribution controller %% for this channel with the VM. %% - The input handler. It will dispatch all incoming %% traffic from the gen_tcp socket to the VM. This %% process is also the socket owner and receives %% incoming traffic using active-N. %% - The tick handler. Dispatches asynchronous tick %% requests to the socket. It executes on max priority %% since it is important to get ticks through to the %% other end. %% - The channel supervisor (provided by dist_util). It %% monitors traffic. Issue tick requests to the tick %% handler when no outgoing traffic is seen and bring %% the connection down if no incoming traffic is seen. %% This process also executes on max priority. %% %% These parties are linked together so should one %% of them fail, all of them are terminated and the %% connection is taken down. %% %% In order to avoid issues with lingering signal binaries %% we enable off-heap message queue data as well as fullsweep %% after 0. The fullsweeps will be cheap since we have more %% or less no live data. -define(DIST_CNTRL_COMMON_SPAWN_OPTS, [{message_queue_data, off_heap}, {fullsweep_after, 0}]). tick_fun(DistCtrl, TickHandler) -> fun (Ctrl) when Ctrl == DistCtrl -> TickHandler ! tick end. getstat_fun(DistCtrl, Socket) -> fun (Ctrl) when Ctrl == DistCtrl -> case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of {ok, Stat} -> split_stat(Stat,0,0,0); Error -> Error end end. split_stat([{recv_cnt, R}|Stat], _, W, P) -> split_stat(Stat, R, W, P); split_stat([{send_cnt, W}|Stat], R, _, P) -> split_stat(Stat, R, W, P); split_stat([{send_pend, P}|Stat], R, W, _) -> split_stat(Stat, R, W, P); split_stat([], R, W, P) -> {ok, R, W, P}. setopts_fun(DistCtrl, Socket) -> fun (Ctrl, Opts) when Ctrl == DistCtrl -> setopts(Socket, Opts) end. getopts_fun(DistCtrl, Socket) -> fun (Ctrl, Opts) when Ctrl == DistCtrl -> getopts(Socket, Opts) end. setopts(S, Opts) -> case [Opt || {K,_}=Opt <- Opts, K =:= active orelse K =:= deliver orelse K =:= packet] of [] -> inet:setopts(S,Opts); Opts1 -> {error, {badopts,Opts1}} end. getopts(S, Opts) -> inet:getopts(S, Opts). send_fun() -> fun (Ctrlr, Packet) -> call_ctrlr(Ctrlr, {send, Packet}) end. recv_fun() -> fun (Ctrlr, Length, Timeout) -> case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of {ok, Bin} when is_binary(Bin) -> {ok, binary_to_list(Bin)}; Other -> Other end end. getll_fun() -> fun (Ctrlr) -> call_ctrlr(Ctrlr, getll) end. address_fun() -> fun (Ctrlr, Node) -> case call_ctrlr(Ctrlr, {address, Node}) of {error, no_node} -> %% No '@' or more than one '@' in node name. ?shutdown(no_node); Res -> Res end end. setopts_pre_nodeup_fun() -> fun (Ctrlr) -> call_ctrlr(Ctrlr, pre_nodeup) end. setopts_post_nodeup_fun() -> fun (Ctrlr) -> call_ctrlr(Ctrlr, post_nodeup) end. handshake_complete_fun() -> fun (Ctrlr, Node, DHandle) -> call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle}) end. call_ctrlr(Ctrlr, Msg) -> Ref = erlang:monitor(process, Ctrlr), Ctrlr ! {Ref, self(), Msg}, receive {Ref, Res} -> erlang:demonitor(Ref, [flush]), Res; {'DOWN', Ref, process, Ctrlr, Reason} -> exit({dist_controller_exit, Reason}) end. %% %% The tick handler process writes a tick to the %% socket when it receives a 'tick' message from %% the connection supervisor. %% %% We are not allowed to block the connection %% superviser when writing a tick and we also want %% the tick to go through even during a heavily %% loaded system. gen_tcp does not have a %% non-blocking send operation exposed in its API %% and we don't want to run the distribution %% controller under high priority. Therefore this %% separate process with max prio that dispatches %% ticks. %% dist_cntrlr_tick_handler(Socket) -> receive tick -> %% May block due to busy port... sock_send(Socket, ""); _ -> ok end, dist_cntrlr_tick_handler(Socket). spawn_dist_cntrlr(Socket) -> spawn_opt(?MODULE, dist_cntrlr_setup, [Socket], [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS). dist_cntrlr_setup(Socket) -> TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler, [Socket], [link, {priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), dist_cntrlr_setup_loop(Socket, TickHandler, undefined). %% %% During the handshake phase we loop in dist_cntrlr_setup(). %% When the connection is up we spawn an input handler and %% continue as output handler. %% dist_cntrlr_setup_loop(Socket, TickHandler, Sup) -> receive {tcp_closed, Socket} -> exit(connection_closed); {Ref, From, {supervisor, Pid}} -> Res = link(Pid), From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Pid); {Ref, From, tick_handler} -> From ! {Ref, TickHandler}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, socket} -> From ! {Ref, Socket}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, {send, Packet}} -> Res = gen_tcp:send(Socket, Packet), From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, {recv, Length, Timeout}} -> Res = gen_tcp:recv(Socket, Length, Timeout), From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, getll} -> From ! {Ref, {ok, self()}}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, {address, Node}} -> Res = case inet:peername(Socket) of {ok, Address} -> case split_node(atom_to_list(Node), $@, []) of [_,Host] -> #net_address{address=Address,host=Host, protocol=tcp, family=inet}; _ -> {error, no_node} end end, From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, pre_nodeup} -> Res = inet:setopts(Socket, [{active, false}, {packet, 4}, nodelay()]), From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, post_nodeup} -> Res = inet:setopts(Socket, [{active, false}, {packet, 4}, nodelay()]), From ! {Ref, Res}, dist_cntrlr_setup_loop(Socket, TickHandler, Sup); {Ref, From, {handshake_complete, _Node, DHandle}} -> From ! {Ref, ok}, %% Handshake complete! Begin dispatching traffic... %% We use separate process for dispatching input. This %% is not necessary, but it enables parallel execution %% of independent work loads at the same time as it %% simplifies the the implementation... InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup, [DHandle, Socket, Sup], [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), flush_controller(InputHandler, Socket), gen_tcp:controlling_process(Socket, InputHandler), flush_controller(InputHandler, Socket), ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), InputHandler ! DHandle, %% From now on we execute on normal priority process_flag(priority, normal), erlang:dist_ctrl_get_data_notification(DHandle), case init:get_argument(gen_tcp_dist_output_loop) of error -> dist_cntrlr_output_loop(DHandle, Socket); {ok, [[ModStr, FuncStr]]} -> % For testing... apply(list_to_atom(ModStr), list_to_atom(FuncStr), [DHandle, Socket]) end end. %% We use active 10 for good throughput while still %% maintaining back-pressure if the input controller %% isn't able to handle all incoming messages... -define(ACTIVE_INPUT, 10). dist_cntrlr_input_setup(DHandle, Socket, Sup) -> link(Sup), %% Ensure we don't try to put data before we are registered %% as input handler... receive DHandle -> dist_cntrlr_input_loop(DHandle, Socket, 0) end. dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 -> inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]), dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT); dist_cntrlr_input_loop(DHandle, Socket, N) -> receive {tcp_closed, Socket} -> %% Connection to remote node terminated... exit(connection_closed); {tcp, Socket, Data} -> %% Incoming data from remote node... try erlang:dist_ctrl_put_data(DHandle, Data) catch _ : _ -> death_row() end, dist_cntrlr_input_loop(DHandle, Socket, N-1); _ -> %% Ignore... dist_cntrlr_input_loop(DHandle, Socket, N) end. dist_cntrlr_send_data(DHandle, Socket) -> case erlang:dist_ctrl_get_data(DHandle) of none -> erlang:dist_ctrl_get_data_notification(DHandle); Data -> sock_send(Socket, Data), dist_cntrlr_send_data(DHandle, Socket) end. dist_cntrlr_output_loop(DHandle, Socket) -> receive dist_data -> %% Outgoing data from this node... try dist_cntrlr_send_data(DHandle, Socket) catch _ : _ -> death_row() end, dist_cntrlr_output_loop(DHandle, Socket); {send, From, Ref, Data} -> %% This is for testing only! %% %% Needed by some OTP distribution %% test suites... sock_send(Socket, Data), From ! {Ref, ok}, dist_cntrlr_output_loop(DHandle, Socket); _ -> %% Drop garbage message... dist_cntrlr_output_loop(DHandle, Socket) end. sock_send(Socket, Data) -> try gen_tcp:send(Socket, Data) of ok -> ok; {error, Reason} -> death_row({send_error, Reason}) catch Type : Reason -> death_row({send_error, {Type, Reason}}) end. death_row() -> death_row(connection_closed). death_row(normal) -> %% We do not want to exit with normal %% exit reason since it won't bring down %% linked processes... death_row(); death_row(Reason) -> %% When the connection is on its way down operations %% begin to fail. We catch the failures and call %% this function waiting for termination. We should %% be terminated by one of our links to the other %% involved parties that began bringing the %% connection down. By waiting for termination we %% avoid altering the exit reason for the connection %% teardown. We however limit the wait to 5 seconds %% and bring down the connection ourselves if not %% terminated... receive after 5000 -> exit(Reason) end.