Loading README.md +1 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ Available versions: | JInterface | Erlang versions supported | |------------|---------------------------| | 1.11.1 | >=23.3, <23.3.4.12 | | 1.11 | >=23, <23.3 | | 1.10.1 | >=22.1, <23 | | 1.10 | >=22, <22.1 | Loading java_src/com/ericsson/otp/erlang/AbstractConnection.java +91 −14 Original line number Diff line number Diff line Loading @@ -76,6 +76,8 @@ public abstract class AbstractConnection extends Thread { protected static final int exitTTTag = 13; protected static final int regSendTTTag = 16; protected static final int exit2TTTag = 18; protected static final int unlinkIdTag = 35; protected static final int unlinkIdAckTag = 36; // MD5 challenge messsage tags protected static final int ChallengeReply = 'r'; Loading Loading @@ -355,10 +357,8 @@ public abstract class AbstractConnection extends Thread { // link to pid /** * Create a link between the local node and the specified process on the * remote node. If the link is still active when the remote process * terminates, an exit signal will be sent to this connection. Use * {@link #sendUnlink unlink()} to remove the link. * * Send link signal to remote process. * * @param dest * the Erlang PID of the remote process. Loading Loading @@ -393,9 +393,8 @@ public abstract class AbstractConnection extends Thread { } /** * Remove a link between the local node and the specified process on the * remote node. This method deactivates links created with {@link #sendLink * link()}. * * Send unlink signal to remote process. * * @param dest * the Erlang PID of the remote process. Loading @@ -404,7 +403,8 @@ public abstract class AbstractConnection extends Thread { * if the connection is not active or a communication error * occurs. */ protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest) protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest, long unlink_id) throws IOException { if (!connected) { throw new IOException("Not connected"); Loading @@ -417,11 +417,29 @@ public abstract class AbstractConnection extends Thread { header.write1(passThrough); header.write1(version); if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { // header header.write_tuple_head(4); header.write_long(unlinkIdTag); header.write_long(unlink_id); header.write_any(from); header.write_any(dest); } else { /* * A node that isn't capable of talking the new link protocol. * * Send an old unlink op, and send ourselves an unlink-ack. We may * end up in an inconsistent state as we could before the new link * protocol was introduced... */ // header header.write_tuple_head(3); header.write_long(unlinkTag); header.write_any(from); header.write_any(dest); deliver(new OtpMsg(unlinkIdAckTag, dest, from, unlink_id)); } // fix up length in preamble header.poke4BE(0, header.size() - 4); Loading @@ -429,6 +447,45 @@ public abstract class AbstractConnection extends Thread { do_send(header); } /** * Send unlink acknowledgment signal to remote process. * * @param dest * the Erlang PID of the remote process. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendUnlinkAck(final OtpErlangPid from, final OtpErlangPid dest, long unlink_id) throws IOException { if (!connected) { throw new IOException("Not connected"); } if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header header.write_tuple_head(4); header.write_long(unlinkIdAckTag); header.write_long(unlink_id); header.write_any(from); header.write_any(dest); // fix up length in preamble header.poke4BE(0, header.size() - 4); do_send(header); } } /* used internally when "processes" terminate */ protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { Loading Loading @@ -685,7 +742,21 @@ public abstract class AbstractConnection extends Thread { from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); deliver(new OtpMsg(tag, from, to)); deliver(new OtpMsg(tag, from, to, 0)); break; case unlinkIdTag: // { UNLINK_ID, UnlinkId, FromPid, ToPid} case unlinkIdAckTag: // { UNLINK_ID_Ack, UnlinkId, FromPid, ToPid} if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } long unlink_id = ((OtpErlangLong) head.elementAt(1)).longValue(); from = (OtpErlangPid) head.elementAt(2); to = (OtpErlangPid) head.elementAt(3); deliver(new OtpMsg(tag, from, to, unlink_id)); break; // absolutely no idea what to do with these, so we ignore Loading Loading @@ -877,6 +948,12 @@ public abstract class AbstractConnection extends Thread { case unlinkTag: return "UNLINK"; case unlinkIdTag: return "UNLINK_ID"; case unlinkIdAckTag: return "UNLINK_ID_ACK"; case regSendTag: return "REG_SEND"; Loading java_src/com/ericsson/otp/erlang/AbstractNode.java +3 −1 Original line number Diff line number Diff line Loading @@ -94,6 +94,7 @@ public class AbstractNode implements OtpTransportFactory { static final int dFlagMapTag = 0x20000; static final int dFlagBigCreation = 0x40000; static final int dFlagHandshake23 = 0x1000000; static final int dFlagUnlinkId = 0x2000000; int ntype = NTYPE_R6; int proto = 0; // tcp/ip Loading @@ -105,7 +106,8 @@ public class AbstractNode implements OtpTransportFactory { | dflagNewFunTags | dFlagUtf8Atoms | dFlagMapTag | dFlagExportPtrTag | dFlagBigCreation | dFlagHandshake23; | dFlagHandshake23 | dFlagUnlinkId; /* initialize hostname and default cookie */ static { Loading java_src/com/ericsson/otp/erlang/Link.java +11 −0 Original line number Diff line number Diff line Loading @@ -23,11 +23,14 @@ package com.ericsson.otp.erlang; class Link { private final OtpErlangPid local; private final OtpErlangPid remote; private long unlinking = 0; private int hashCodeValue = 0; public Link(final OtpErlangPid local, final OtpErlangPid remote) { this.local = local; this.remote = remote; this.unlinking = 0; } public OtpErlangPid local() { Loading @@ -47,6 +50,14 @@ class Link { || local.equals(aremote) && remote.equals(alocal); } public long getUnlinking() { return this.unlinking; } public void setUnlinking(long unlink_id) { this.unlinking = unlink_id; } @Override public int hashCode() { if (hashCodeValue == 0) { Loading java_src/com/ericsson/otp/erlang/Links.java +94 −38 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ package com.ericsson.otp.erlang; class Links { Link[] links; int count; int active; Links() { this(10); Loading @@ -31,68 +32,132 @@ class Links { Links(final int initialSize) { links = new Link[initialSize]; count = 0; active = 0; } synchronized void addLink(final OtpErlangPid local, final OtpErlangPid remote) { if (find(local, remote) == -1) { // Try add link and return if it was added or not... // If already existing it will not be added again. // If force is true it is added even if it is // currently unlinking; otherwise not. synchronized boolean addLink(final OtpErlangPid local, final OtpErlangPid remote, final boolean force) { int i = find(local, remote); if (i != -1) { if (links[i].getUnlinking() != 0 && force) { links[i].setUnlinking(0); active++; return true; } return false; } else { if (count >= links.length) { final Link[] tmp = new Link[count * 2]; System.arraycopy(links, 0, tmp, 0, count); links = tmp; } links[count++] = new Link(local, remote); active++; return true; } } synchronized void removeLink(final OtpErlangPid local, // Try remove link and return whether it was active or not... synchronized boolean removeLink(final OtpErlangPid local, final OtpErlangPid remote) { int i; if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); count--; links[i] = links[count]; links[count] = null; if (unlinking == 0) { active--; return true; } } return false; } synchronized boolean exists(final OtpErlangPid local, // Try remove active link and return whether it was removed or not... synchronized boolean removeActiveLink(final OtpErlangPid local, final OtpErlangPid remote) { return find(local, remote) != -1; int i; if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); if (unlinking != 0) return false; count--; active--; links[i] = links[count]; links[count] = null; return true; } return false; } synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) { for (int i = 0; i < count; i++) { if (links[i].equals(local, remote)) { return i; // Remove link if unlink_id match and return whether it was removed or not... synchronized boolean removeUnlinkingLink(final OtpErlangPid local, final OtpErlangPid remote, final long unlink_id) { int i; if (unlink_id == 0) { return false; } if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); if (unlinking != unlink_id) return false; count--; links[i] = links[count]; links[count] = null; return true; } return -1; return false; } synchronized boolean setUnlinking(final OtpErlangPid local, final OtpErlangPid remote, final long unlink_id) { int i; if (unlink_id == 0) { return false; } int count() { return count; if ((i = find(local, remote)) != -1) { if (links[i].getUnlinking() == 0) { links[i].setUnlinking(unlink_id); active--; return true; } } return false; } /* all local pids get notified about broken connection */ synchronized OtpErlangPid[] localPids() { OtpErlangPid[] ret = null; if (count != 0) { ret = new OtpErlangPid[count]; synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) { for (int i = 0; i < count; i++) { ret[i] = links[i].local(); if (links[i].equals(local, remote)) { return i; } } return ret; return -1; } /* all remote pids get notified about failed pid */ synchronized OtpErlangPid[] remotePids() { OtpErlangPid[] ret = null; if (count != 0) { ret = new OtpErlangPid[count]; for (int i = 0; i < count; i++) { ret[i] = links[i].remote(); if (active != 0) { int a = 0; ret = new OtpErlangPid[active]; for (int i = 0; a < active; i++) { if (links[i].getUnlinking() == 0) { ret[a++] = links[i].remote(); } } } return ret; Loading @@ -112,13 +177,4 @@ class Links { return ret; } /* returns a copy of the link table */ synchronized Link[] links() { Link[] ret = null; if (count != 0) { ret = new Link[count]; System.arraycopy(links, 0, ret, 0, count); } return ret; } } Loading
README.md +1 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ Available versions: | JInterface | Erlang versions supported | |------------|---------------------------| | 1.11.1 | >=23.3, <23.3.4.12 | | 1.11 | >=23, <23.3 | | 1.10.1 | >=22.1, <23 | | 1.10 | >=22, <22.1 | Loading
java_src/com/ericsson/otp/erlang/AbstractConnection.java +91 −14 Original line number Diff line number Diff line Loading @@ -76,6 +76,8 @@ public abstract class AbstractConnection extends Thread { protected static final int exitTTTag = 13; protected static final int regSendTTTag = 16; protected static final int exit2TTTag = 18; protected static final int unlinkIdTag = 35; protected static final int unlinkIdAckTag = 36; // MD5 challenge messsage tags protected static final int ChallengeReply = 'r'; Loading Loading @@ -355,10 +357,8 @@ public abstract class AbstractConnection extends Thread { // link to pid /** * Create a link between the local node and the specified process on the * remote node. If the link is still active when the remote process * terminates, an exit signal will be sent to this connection. Use * {@link #sendUnlink unlink()} to remove the link. * * Send link signal to remote process. * * @param dest * the Erlang PID of the remote process. Loading Loading @@ -393,9 +393,8 @@ public abstract class AbstractConnection extends Thread { } /** * Remove a link between the local node and the specified process on the * remote node. This method deactivates links created with {@link #sendLink * link()}. * * Send unlink signal to remote process. * * @param dest * the Erlang PID of the remote process. Loading @@ -404,7 +403,8 @@ public abstract class AbstractConnection extends Thread { * if the connection is not active or a communication error * occurs. */ protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest) protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest, long unlink_id) throws IOException { if (!connected) { throw new IOException("Not connected"); Loading @@ -417,11 +417,29 @@ public abstract class AbstractConnection extends Thread { header.write1(passThrough); header.write1(version); if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { // header header.write_tuple_head(4); header.write_long(unlinkIdTag); header.write_long(unlink_id); header.write_any(from); header.write_any(dest); } else { /* * A node that isn't capable of talking the new link protocol. * * Send an old unlink op, and send ourselves an unlink-ack. We may * end up in an inconsistent state as we could before the new link * protocol was introduced... */ // header header.write_tuple_head(3); header.write_long(unlinkTag); header.write_any(from); header.write_any(dest); deliver(new OtpMsg(unlinkIdAckTag, dest, from, unlink_id)); } // fix up length in preamble header.poke4BE(0, header.size() - 4); Loading @@ -429,6 +447,45 @@ public abstract class AbstractConnection extends Thread { do_send(header); } /** * Send unlink acknowledgment signal to remote process. * * @param dest * the Erlang PID of the remote process. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendUnlinkAck(final OtpErlangPid from, final OtpErlangPid dest, long unlink_id) throws IOException { if (!connected) { throw new IOException("Not connected"); } if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header header.write_tuple_head(4); header.write_long(unlinkIdAckTag); header.write_long(unlink_id); header.write_any(from); header.write_any(dest); // fix up length in preamble header.poke4BE(0, header.size() - 4); do_send(header); } } /* used internally when "processes" terminate */ protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { Loading Loading @@ -685,7 +742,21 @@ public abstract class AbstractConnection extends Thread { from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); deliver(new OtpMsg(tag, from, to)); deliver(new OtpMsg(tag, from, to, 0)); break; case unlinkIdTag: // { UNLINK_ID, UnlinkId, FromPid, ToPid} case unlinkIdAckTag: // { UNLINK_ID_Ack, UnlinkId, FromPid, ToPid} if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } long unlink_id = ((OtpErlangLong) head.elementAt(1)).longValue(); from = (OtpErlangPid) head.elementAt(2); to = (OtpErlangPid) head.elementAt(3); deliver(new OtpMsg(tag, from, to, unlink_id)); break; // absolutely no idea what to do with these, so we ignore Loading Loading @@ -877,6 +948,12 @@ public abstract class AbstractConnection extends Thread { case unlinkTag: return "UNLINK"; case unlinkIdTag: return "UNLINK_ID"; case unlinkIdAckTag: return "UNLINK_ID_ACK"; case regSendTag: return "REG_SEND"; Loading
java_src/com/ericsson/otp/erlang/AbstractNode.java +3 −1 Original line number Diff line number Diff line Loading @@ -94,6 +94,7 @@ public class AbstractNode implements OtpTransportFactory { static final int dFlagMapTag = 0x20000; static final int dFlagBigCreation = 0x40000; static final int dFlagHandshake23 = 0x1000000; static final int dFlagUnlinkId = 0x2000000; int ntype = NTYPE_R6; int proto = 0; // tcp/ip Loading @@ -105,7 +106,8 @@ public class AbstractNode implements OtpTransportFactory { | dflagNewFunTags | dFlagUtf8Atoms | dFlagMapTag | dFlagExportPtrTag | dFlagBigCreation | dFlagHandshake23; | dFlagHandshake23 | dFlagUnlinkId; /* initialize hostname and default cookie */ static { Loading
java_src/com/ericsson/otp/erlang/Link.java +11 −0 Original line number Diff line number Diff line Loading @@ -23,11 +23,14 @@ package com.ericsson.otp.erlang; class Link { private final OtpErlangPid local; private final OtpErlangPid remote; private long unlinking = 0; private int hashCodeValue = 0; public Link(final OtpErlangPid local, final OtpErlangPid remote) { this.local = local; this.remote = remote; this.unlinking = 0; } public OtpErlangPid local() { Loading @@ -47,6 +50,14 @@ class Link { || local.equals(aremote) && remote.equals(alocal); } public long getUnlinking() { return this.unlinking; } public void setUnlinking(long unlink_id) { this.unlinking = unlink_id; } @Override public int hashCode() { if (hashCodeValue == 0) { Loading
java_src/com/ericsson/otp/erlang/Links.java +94 −38 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ package com.ericsson.otp.erlang; class Links { Link[] links; int count; int active; Links() { this(10); Loading @@ -31,68 +32,132 @@ class Links { Links(final int initialSize) { links = new Link[initialSize]; count = 0; active = 0; } synchronized void addLink(final OtpErlangPid local, final OtpErlangPid remote) { if (find(local, remote) == -1) { // Try add link and return if it was added or not... // If already existing it will not be added again. // If force is true it is added even if it is // currently unlinking; otherwise not. synchronized boolean addLink(final OtpErlangPid local, final OtpErlangPid remote, final boolean force) { int i = find(local, remote); if (i != -1) { if (links[i].getUnlinking() != 0 && force) { links[i].setUnlinking(0); active++; return true; } return false; } else { if (count >= links.length) { final Link[] tmp = new Link[count * 2]; System.arraycopy(links, 0, tmp, 0, count); links = tmp; } links[count++] = new Link(local, remote); active++; return true; } } synchronized void removeLink(final OtpErlangPid local, // Try remove link and return whether it was active or not... synchronized boolean removeLink(final OtpErlangPid local, final OtpErlangPid remote) { int i; if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); count--; links[i] = links[count]; links[count] = null; if (unlinking == 0) { active--; return true; } } return false; } synchronized boolean exists(final OtpErlangPid local, // Try remove active link and return whether it was removed or not... synchronized boolean removeActiveLink(final OtpErlangPid local, final OtpErlangPid remote) { return find(local, remote) != -1; int i; if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); if (unlinking != 0) return false; count--; active--; links[i] = links[count]; links[count] = null; return true; } return false; } synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) { for (int i = 0; i < count; i++) { if (links[i].equals(local, remote)) { return i; // Remove link if unlink_id match and return whether it was removed or not... synchronized boolean removeUnlinkingLink(final OtpErlangPid local, final OtpErlangPid remote, final long unlink_id) { int i; if (unlink_id == 0) { return false; } if ((i = find(local, remote)) != -1) { long unlinking = links[i].getUnlinking(); if (unlinking != unlink_id) return false; count--; links[i] = links[count]; links[count] = null; return true; } return -1; return false; } synchronized boolean setUnlinking(final OtpErlangPid local, final OtpErlangPid remote, final long unlink_id) { int i; if (unlink_id == 0) { return false; } int count() { return count; if ((i = find(local, remote)) != -1) { if (links[i].getUnlinking() == 0) { links[i].setUnlinking(unlink_id); active--; return true; } } return false; } /* all local pids get notified about broken connection */ synchronized OtpErlangPid[] localPids() { OtpErlangPid[] ret = null; if (count != 0) { ret = new OtpErlangPid[count]; synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) { for (int i = 0; i < count; i++) { ret[i] = links[i].local(); if (links[i].equals(local, remote)) { return i; } } return ret; return -1; } /* all remote pids get notified about failed pid */ synchronized OtpErlangPid[] remotePids() { OtpErlangPid[] ret = null; if (count != 0) { ret = new OtpErlangPid[count]; for (int i = 0; i < count; i++) { ret[i] = links[i].remote(); if (active != 0) { int a = 0; ret = new OtpErlangPid[active]; for (int i = 0; a < active; i++) { if (links[i].getUnlinking() == 0) { ret[a++] = links[i].remote(); } } } return ret; Loading @@ -112,13 +177,4 @@ class Links { return ret; } /* returns a copy of the link table */ synchronized Link[] links() { Link[] ret = null; if (count != 0) { ret = new Link[count]; System.arraycopy(links, 0, ret, 0, count); } return ret; } }