Logo Search packages:      
Sourcecode: zeroc-ice-java version File versions  Download package

ConnectionI.java

// **********************************************************************
//
// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
//
// **********************************************************************

package Ice;

public final class ConnectionI extends IceInternal.EventHandler implements Connection
{
    public java.lang.Object
    clone()
        throws java.lang.CloneNotSupportedException
    {
        return super.clone();
    }

    public int
    ice_hash()
    {
        return hashCode();
    }

    public void
    validate()
    {
      if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
      {
          boolean active;
          
          synchronized(this)
          {
            if(_instance.threadPerConnection() && _threadPerConnection != Thread.currentThread())
            {
                //
                // In thread per connection mode, this connection's thread
                // will take care of connection validation. Therefore all we
                // have to do here is to wait until this thread has completed
                // validation.
                //
                while(_state == StateNotValidated)
                {
                  try
                  {
                      wait();
                  }
                  catch(InterruptedException ex)
                  {
                  }
                }
                
                if(_state >= StateClosing)
                {
                  assert(_exception != null);
                  throw _exception;
                }
                
                return;
            }

            //
            // The connection might already be closed (e.g.: the communicator 
            // was destroyed or object adapter deactivated.)
            //
            assert(_state == StateNotValidated || _state == StateClosed);
            if(_state == StateClosed)
            {
                assert(_exception != null);
                throw _exception;
            }
            
            if(_adapter != null)
            {
                active = true; // The server side has the active role for connection validation.
            }
            else
            {
                active = false; // The client side has the passive role for connection validation.
            }         
          }

          try
          {
            int timeout;
            IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
            if(defaultsAndOverrides.overrideConnectTimeout)
            {
                timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
            }
            else
            {
                timeout = _endpoint.timeout();
            }

            if(active)
            {
                synchronized(_sendMutex)
                {
                  if(_transceiver == null) // Has the transceiver already been closed?
                  {
                      assert(_exception != null);
                      throw _exception; // The exception is immutable at this point.
                  }

                  IceInternal.BasicStream os = new IceInternal.BasicStream(_instance);
                  os.writeBlob(IceInternal.Protocol.magic);
                  os.writeByte(IceInternal.Protocol.protocolMajor);
                  os.writeByte(IceInternal.Protocol.protocolMinor);
                  os.writeByte(IceInternal.Protocol.encodingMajor);
                  os.writeByte(IceInternal.Protocol.encodingMinor);
                  os.writeByte(IceInternal.Protocol.validateConnectionMsg);
                  os.writeByte((byte)0); // Compression status (always zero for validate connection).
                  os.writeInt(IceInternal.Protocol.headerSize); // Message size.
                  IceInternal.TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
                  try
                  {
                      _transceiver.write(os, timeout);
                  }
                  catch(Ice.TimeoutException ex)
                  {
                      throw new Ice.ConnectTimeoutException();
                  }
                }
            }
            else
            {
                IceInternal.BasicStream is = new IceInternal.BasicStream(_instance);
                is.resize(IceInternal.Protocol.headerSize, true);
                is.pos(0);
                try
                {
                  _transceiver.read(is, timeout);
                }
                catch(Ice.TimeoutException ex)
                {
                  throw new Ice.ConnectTimeoutException();
                }
                assert(is.pos() == IceInternal.Protocol.headerSize);
                is.pos(0);
                byte[] m = is.readBlob(4);
                if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
                   m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
                {
                    BadMagicException ex = new BadMagicException();
                  ex.badMagic = m;
                  throw ex;
                }
                byte pMajor = is.readByte();
                byte pMinor = is.readByte();
                if(pMajor != IceInternal.Protocol.protocolMajor)
                {
                  UnsupportedProtocolException e = new UnsupportedProtocolException();
                  e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
                  e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
                  e.major = IceInternal.Protocol.protocolMajor;
                  e.minor = IceInternal.Protocol.protocolMinor;
                  throw e;
                }
                byte eMajor = is.readByte();
                byte eMinor = is.readByte();
                if(eMajor != IceInternal.Protocol.encodingMajor)
                {
                  UnsupportedEncodingException e = new UnsupportedEncodingException();
                  e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
                  e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
                  e.major = IceInternal.Protocol.encodingMajor;
                  e.minor = IceInternal.Protocol.encodingMinor;
                  throw e;
                }
                byte messageType = is.readByte();
                if(messageType != IceInternal.Protocol.validateConnectionMsg)
                {
                  throw new ConnectionNotValidatedException();
                }
                byte compress = is.readByte(); // Ignore compression status for validate connection.
                int size = is.readInt();
                if(size != IceInternal.Protocol.headerSize)
                {
                  throw new IllegalMessageSizeException();
                }
                IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels);
            }
          }
          catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
          {
            synchronized(this)
            {
                setState(StateClosed, ex.get());
                assert(_exception != null);
                throw _exception;
            }
          }
          catch(LocalException ex)
          {
            synchronized(this)
            {
                setState(StateClosed, ex);
                assert(_exception != null);
                throw _exception;
            }
          }
      }
      
      synchronized(this)
      {
          if(_acmTimeout > 0)
          {
            _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
          }
          
          //
          // We start out in holding state.
          //
          setState(StateHolding);
      }
    }

    public synchronized void
    activate()
    {
      while(_state == StateNotValidated)
      {
          try
          {
            wait();
          }
          catch(InterruptedException ex)
          {
          }
      }

      setState(StateActive);
    }

    public synchronized void
    hold()
    {
      while(_state == StateNotValidated)
      {
          try
          {
            wait();
          }
          catch(InterruptedException ex)
          {
          }
      }

      setState(StateHolding);
    }

    // DestructionReason.
    public final static int ObjectAdapterDeactivated = 0;
    public final static int CommunicatorDestroyed = 1;

    public synchronized void
    destroy(int reason)
    {
      switch(reason)
      {
          case ObjectAdapterDeactivated:
          {
            setState(StateClosing, new ObjectAdapterDeactivatedException());
            break;
          }
          
          case CommunicatorDestroyed:
          {
            setState(StateClosing, new CommunicatorDestroyedException());
            break;
          }
      }
    }

    public synchronized void
    close(boolean force)
    {
      if(force)
      {
          setState(StateClosed, new ForcedCloseConnectionException());
      }
      else
      {
          //
          // If we do a graceful shutdown, then we wait until all
          // outstanding requests have been completed. Otherwise,
          // the CloseConnectionException will cause all outstanding
          // requests to be retried, regardless of whether the
          // server has processed them or not.
          //
          while(!_requests.isEmpty() || !_asyncRequests.isEmpty())
          {
            try
            {
                wait();
            }
            catch(InterruptedException ex)
            {
            }
          }

          setState(StateClosing, new CloseConnectionException());
      }
    }

    public synchronized boolean
    isDestroyed()
    {
      return _state >= StateClosing;
    }

    public boolean
    isFinished()
    {
      Thread threadPerConnection = null;

      synchronized(this)
      {
          if(_transceiver != null || _dispatchCount != 0 ||
             (_threadPerConnection != null &&   _threadPerConnection.isAlive()))
          {
            return false;
          }

          assert(_state == StateClosed);

          threadPerConnection = _threadPerConnection;
          _threadPerConnection = null;
      }

      if(threadPerConnection != null)
      {
          while(true)
          {
            try
            {
                threadPerConnection.join();
                break;
            }
            catch(InterruptedException ex)
            {
            }
          }
      }

      return true;
    }

    public synchronized void
    throwException()
    {
        if(_exception != null)
        {
            assert(_state >= StateClosing);
            throw _exception;
        }
    }

    public synchronized void
    waitUntilHolding()
    {
      while(_state < StateHolding || _dispatchCount > 0)
      {
          try
          {
            wait();
          }
          catch(InterruptedException ex)
          {
          }
        }
    }

    public void
    waitUntilFinished()
    {
      Thread threadPerConnection = null;

      synchronized(this)
      {
          //
          // We wait indefinitely until connection closing has been
          // initiated. We also wait indefinitely until all outstanding
          // requests are completed. Otherwise we couldn't guarantee
          // that there are no outstanding calls when deactivate() is
          // called on the servant locators.
          //
          while(_state < StateClosing || _dispatchCount > 0)
          {
            try
            {
                wait();
            }
            catch(InterruptedException ex)
            {
            }
          }
          
          //
          // Now we must wait until close() has been called on the
          // transceiver.
          //
          while(_transceiver != null)
          {
            try
            {
                if(_state != StateClosed && _endpoint.timeout() >= 0)
                {
                  long absoluteWaitTime = _stateTime + _endpoint.timeout();
                  long waitTime = absoluteWaitTime - System.currentTimeMillis();
                  
                  if(waitTime > 0)
                  {
                      //
                      // We must wait a bit longer until we close this
                      // connection.
                      //
                      wait(waitTime);
                      if(System.currentTimeMillis() >= absoluteWaitTime)
                      {
                        setState(StateClosed, new CloseTimeoutException());
                      }
                  }
                  else
                  {
                      //
                      // We already waited long enough, so let's close this
                      // connection!
                      //
                      setState(StateClosed, new CloseTimeoutException());
                  }

                  //
                  // No return here, we must still wait until
                  // close() is called on the _transceiver.
                  //
                }
                else
                {
                  wait();
                }
            }
            catch(InterruptedException ex)
            {
            }
          }

          assert(_state == StateClosed);

          threadPerConnection = _threadPerConnection;
          _threadPerConnection = null;
      }

      if(threadPerConnection != null)
      {
          while(true)
          {
            try
            {
                threadPerConnection.join();
                break;
            }
            catch(InterruptedException ex)
            {
            }
          }
      }
    }

    public synchronized void
    monitor()
    {
      if(_state != StateActive)
      {
          return;
      }
      
      //
      // Check for timed out async requests.
      //
      java.util.Iterator i = _asyncRequests.entryIterator();
      while(i.hasNext())
      {
          IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
          IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue();
          if(out.__timedOut())
          {
            setState(StateClosed, new TimeoutException());
            return;
          }
      }

      //
      // Active connection management for idle connections.
      //
      if(_acmTimeout > 0 &&
         _requests.isEmpty() && _asyncRequests.isEmpty() &&
         !_batchStreamInUse && _batchStream.isEmpty() &&
         _dispatchCount == 0)
      {
          if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis)
          {
            setState(StateClosing, new ConnectionTimeoutException());
            return;
          }
      }         
    }

    public void
    sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress)
        throws IceInternal.LocalExceptionWrapper
    {
      int requestId = 0;
      IceInternal.BasicStream stream = null;

      synchronized(this)
      {
          assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams.

          if(_exception != null)
          {
            //
            // If the connection is closed before we even have a chance
            // to send our request, we always try to send the request
            // again.
            //
            throw new IceInternal.LocalExceptionWrapper(_exception, true);
          }

          assert(_state > StateNotValidated);
          assert(_state < StateClosing);

          //
          // Only add to the request map if this is a twoway call.
          //
          if(out != null)
          {
            //
            // Create a new unique request ID.
            //
            requestId = _nextRequestId++;
            if(requestId <= 0)
            {
                _nextRequestId = 1;
                requestId = _nextRequestId++;
            }

            //
            // Fill in the request ID.
            //
            os.pos(IceInternal.Protocol.headerSize);
            os.writeInt(requestId);

            //
            // Add to the requests map.
            //
            _requests.put(requestId, out);
          }

          stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress);

          if(_acmTimeout > 0)
          {
            _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
          }
      }

      try
      {
          synchronized(_sendMutex)
          {
            if(_transceiver == null) // Has the transceiver already been closed?
            {
                assert(_exception != null);
                throw _exception; // The exception is immutable at this point.
            }

            //
            // Send the request.
            //
            IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
            _transceiver.write(stream, _endpoint.timeout());
          }
      }
      catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
      {
          synchronized(this)
          {
            setState(StateClosed, ex.get());
            assert(_exception != null);
            
            if(out != null)
            {
                //
                // If the request has already been removed from
                // the request map, we are out of luck. It would
                // mean that finished() has been called already,
                // and therefore the exception has been set using
                // the Outgoing::finished() callback. In this
                // case, we cannot throw the exception here,
                // because we must not both raise an exception and
                // have Outgoing::finished() called with an
                // exception. This means that in some rare cases,
                // a request will not be retried even though it
                // could. But I honestly don't know how I could
                // avoid this, without a very elaborate and
                // complex design, which would be bad for
                // performance.
                //
                IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId);
                if(o != null)
                {
                  assert(o == out);
                  throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry());
                }
            }
            else
            {
                throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry());
            }
          }
      }
      catch(LocalException ex)
      {
          synchronized(this)
          {
            setState(StateClosed, ex);
            assert(_exception != null);
            
            if(out != null)
            {
                //
                // If the request has already been removed from
                // the request map, we are out of luck. It would
                // mean that finished() has been called already,
                // and therefore the exception has been set using
                // the Outgoing::finished() callback. In this
                // case, we cannot throw the exception here,
                // because we must not both raise an exception and
                // have Outgoing::finished() called with an
                // exception. This means that in some rare cases,
                // a request will not be retried even though it
                // could. But I honestly don't know how I could
                // avoid this, without a very elaborate and
                // complex design, which would be bad for
                // performance.
                //
                IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId);
                if(o != null)
                {
                  assert(o == out);
                  throw _exception;
                }
            }
            else
            {
                throw _exception;
            }
          }
      }
    }

    public void
    sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out, boolean compress)
        throws IceInternal.LocalExceptionWrapper
    {
      int requestId = 0;
      IceInternal.BasicStream stream = null;

      synchronized(this)
      {
          assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway.

          if(_exception != null)
          {
            //
            // If the connection is closed before we even have a chance
            // to send our request, we always try to send the request
            // again.
            //
            throw new IceInternal.LocalExceptionWrapper(_exception, true);
          }

          assert(_state > StateNotValidated);
          assert(_state < StateClosing);

          //
          // Create a new unique request ID.
          //
          requestId = _nextRequestId++;
          if(requestId <= 0)
          {
            _nextRequestId = 1;
            requestId = _nextRequestId++;
          }
          
          //
          // Fill in the request ID.
          //
          os.pos(IceInternal.Protocol.headerSize);
          os.writeInt(requestId);
          
          //
          // Add to the async requests map.
          //
          _asyncRequests.put(requestId, out);

          stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress);

          if(_acmTimeout > 0)
          {
            _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
          }
      }

      try
      {
          synchronized(_sendMutex)
          {
            if(_transceiver == null) // Has the transceiver already been closed?
            {
                assert(_exception != null);
                throw _exception; // The exception is immutable at this point.
            }

            //
            // Send the request.
            //
            IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
            _transceiver.write(stream, _endpoint.timeout());
          }
      }
      catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
      {
          synchronized(this)
          {
            setState(StateClosed, ex.get());
            assert(_exception != null);
            
            //
            // If the request has already been removed from the
            // async request map, we are out of luck. It would
            // mean that finished() has been called already, and
            // therefore the exception has been set using the
            // OutgoingAsync::__finished() callback. In this case,
            // we cannot throw the exception here, because we must
            // not both raise an exception and have
            // OutgoingAsync::__finished() called with an
            // exception. This means that in some rare cases, a
            // request will not be retried even though it
            // could. But I honestly don't know how I could avoid
            // this, without a very elaborate and complex design,
            // which would be bad for performance.
            //
            IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId);
            if(o != null)
            {
                assert(o == out);
                throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry());
            }
          }
      }
      catch(LocalException ex)
      {
          synchronized(this)
          {
            setState(StateClosed, ex);
            assert(_exception != null);
            
            //
            // If the request has already been removed from the
            // async request map, we are out of luck. It would
            // mean that finished() has been called already, and
            // therefore the exception has been set using the
            // OutgoingAsync::__finished() callback. In this case,
            // we cannot throw the exception here, because we must
            // not both raise an exception and have
            // OutgoingAsync::__finished() called with an
            // exception. This means that in some rare cases, a
            // request will not be retried even though it
            // could. But I honestly don't know how I could avoid
            // this, without a very elaborate and complex design,
            // which would be bad for performance.
            //
            IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId);
            if(o != null)
            {
                assert(o == out);
                throw _exception;
            }
          }
      }
    }

    public synchronized void
    prepareBatchRequest(IceInternal.BasicStream os)
    {
      while(_batchStreamInUse && _exception == null)
      {
          try
          {
            wait();
          }
          catch(InterruptedException ex)
          {
          }
      }

        if(_exception != null)
        {
            throw _exception;
        }

        assert(_state > StateNotValidated);
      assert(_state < StateClosing);

        if(_batchStream.isEmpty())
        {
          try
          {
            _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
          }
          catch(LocalException ex)
          {
            setState(StateClosed, ex);
            throw ex;
          }
        }

        _batchStreamInUse = true;
      _batchStream.swap(os);

      //
      // The batch stream now belongs to the caller, until
      // finishBatchRequest() or abortBatchRequest() is called.
      //
    }

    public synchronized void
    finishBatchRequest(IceInternal.BasicStream os, boolean compress)
    {
      //
      // Get the batch stream back and increment the number of
      // requests in the batch.
        //
      _batchStream.swap(os);
      ++_batchRequestNum;

      //
      // We compress the whole batch if there is at least one compressed
      // message.
      //
      if(compress)
      {
          _batchRequestCompress = true;
      }

      //
      // Notify about the batch stream not being in use anymore.
      //
      assert(_batchStreamInUse);
        _batchStreamInUse = false;
      notifyAll();
    }

    public synchronized void
    abortBatchRequest()
    {
      //
      // Destroy and reset the batch stream and batch count. We
      // cannot save old requests in the batch stream, as they might
      // be corrupted due to incomplete marshaling.
      //
      _batchStream = new IceInternal.BasicStream(_instance);
      _batchRequestNum = 0;
      _batchRequestCompress = false;

      //
      // Notify about the batch stream not being in use anymore.
      //
      assert(_batchStreamInUse);
        _batchStreamInUse = false;
      notifyAll();
    }

    public void
    flushBatchRequests()
    {
      IceInternal.BasicStream stream = null;

      synchronized(this)
      {
          while(_batchStreamInUse && _exception == null)
          {
            try
            {
                wait();
            }
            catch(InterruptedException ex)
            {
            }
          }
          
          if(_exception != null)
          {
            throw _exception;
          }

          if(_batchStream.isEmpty())
          {
            return; // Nothing to do.
          }

          assert(_state > StateNotValidated);
          assert(_state < StateClosing);

          //
          // Fill in the message size.
          //
          _batchStream.pos(10);
          _batchStream.writeInt(_batchStream.size());

          //
          // Fill in the number of requests in the batch.
          //
          _batchStream.writeInt(_batchRequestNum);

          stream = doCompress(_batchStream, _overrideCompress ? _overrideCompressValue : _batchRequestCompress);

          if(_acmTimeout > 0)
          {
            _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
          }

          //
          // Prevent that new batch requests are added while we are
          // flushing.
          //
          _batchStreamInUse = true;
      }

      try
      {
          synchronized(_sendMutex)
          {
            if(_transceiver == null) // Has the transceiver already been closed?
            {
                assert(_exception != null);
                throw _exception; // The exception is immutable at this point.
            }

            //
            // Send the batch request.
            //
            IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
            _transceiver.write(stream, _endpoint.timeout());
          }
      }
      catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
      {
          synchronized(this)
          {
            setState(StateClosed, ex.get());
            assert(_exception != null);
            
            //
            // Since batch requests are all oneways (or datagrams), we
            // must report the exception to the caller.
            //
            throw _exception;
          }
      }
      catch(LocalException ex)
      {
          synchronized(this)
          {
            setState(StateClosed, ex);
            assert(_exception != null);
            
            //
            // Since batch requests are all oneways (or datagrams), we
            // must report the exception to the caller.
            //
            throw _exception;
          }
      }

      synchronized(this)
      {
          //
          // Reset the batch stream, and notify that flushing is over.
          //
          _batchStream = new IceInternal.BasicStream(_instance);
          _batchRequestNum = 0;
          _batchRequestCompress = false;
          _batchStreamInUse = false;
          notifyAll();
      }
    }

    public void
    sendResponse(IceInternal.BasicStream os, byte compressFlag)
    {
      IceInternal.BasicStream stream = null;
      try
      {
          synchronized(_sendMutex)
          {
            if(_transceiver == null) // Has the transceiver already been closed?
            {
                assert(_exception != null);
                throw _exception; // The exception is immutable at this point.
            }

            stream = doCompress(os, compressFlag != 0);

            //
            // Send the reply.
            //
            IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
            _transceiver.write(stream, _endpoint.timeout());
          }
      }
      catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
      {
          synchronized(this)
          {
            setState(StateClosed, ex.get());
          }
      }
      catch(LocalException ex)
      {
          synchronized(this)
          {
            setState(StateClosed, ex);
          }
      }

      synchronized(this)
      {
          assert(_state > StateNotValidated);

          try
          {
            if(--_dispatchCount == 0)
            {
                notifyAll();
            }
            
            if(_state == StateClosing && _dispatchCount == 0)
            {
                initiateShutdown();
            }

            if(_acmTimeout > 0)
            {
                _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
            }
          }
          catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
          {
            setState(StateClosed, ex.get());
          }
          catch(LocalException ex)
          {
            setState(StateClosed, ex);
          }
      }
    }

    public synchronized void
    sendNoResponse()
    {
      assert(_state > StateNotValidated);
      
      try
      {
          if(--_dispatchCount == 0)
          {
            notifyAll();
          }
          
          if(_state == StateClosing && _dispatchCount == 0)
          {
            initiateShutdown();
          }
      }
      catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
      {
          setState(StateClosed, ex.get());
      }
      catch(LocalException ex)
      {
          setState(StateClosed, ex);
      }
    }

    public IceInternal.EndpointI
    endpoint()
    {
        // No mutex protection necessary, _endpoint is immutable.
        return _endpoint;
    }

    public synchronized void
    setAdapter(ObjectAdapter adapter)
    {
      if(_exception != null)
      {
          throw _exception;
      }
      
      assert(_state < StateClosing);

      _adapter = adapter;

      if(_adapter != null)
      {
          _servantManager = ((ObjectAdapterI)_adapter).getServantManager();
          if(_servantManager == null)
          {
              _adapter = null;
          }
      }
      else
      {
          _servantManager = null;
      }

      //
      // We never change the thread pool with which we were
      // initially registered, even if we add or remove an object
      // adapter.
      //
    }

    public synchronized ObjectAdapter
    getAdapter()
    {
      return _adapter;
    }

    public synchronized ObjectPrx
    createProxy(Identity ident)
    {
        //
        // Create a reference and return a reverse proxy for this
        // reference.
        //
        ConnectionI[] connections = new ConnectionI[1];
        connections[0] = this;
        IceInternal.Reference ref = 
          _instance.referenceFactory().create(ident, _instance.getDefaultContext(), "",
                                                IceInternal.Reference.ModeTwoway, connections);
        return _instance.proxyFactory().referenceToProxy(ref);
    }

    //
    // Operations from EventHandler
    //

    public boolean
    datagram()
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
      return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable.
    }

    public boolean
    readable()
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
        return true;
    }

    public boolean
    read(IceInternal.BasicStream stream)
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.

      return _transceiver.read(stream, 0);

      //
      // Updating _acmAbsoluteTimeoutMillis is too expensive here,
      // because we would have to acquire a lock just for this
      // purpose. Instead, we update _acmAbsoluteTimeoutMillis in
      // message().
      //
    }

    public void
    message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool)
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.

      MessageInfo info = new MessageInfo(stream);

        synchronized(this)
        {
          //
          // We must promote within the synchronization, otherwise
          // there could be various race conditions with close
          // connection messages and other messages.
          //
          threadPool.promoteFollower();

            if(_state != StateClosed)
            {
            parseMessage(info);
            }

          //
          // parseMessage() can close the connection, so we must check
          // for closed state again.
          //
          if(_state == StateClosed)
          {
            return;
          }
        }

      //
      // Asynchronous replies must be handled outside the thread
      // synchronization, so that nested calls are possible.
      //
      if(info.outAsync != null)
      {
          info.outAsync.__finished(info.stream);
      }

      //
      // Method invocation (or multiple invocations for batch messages)
      // must be done outside the thread synchronization, so that nested
      // calls are possible.
      //
      invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
    }

    public void
    finished(IceInternal.ThreadPool threadPool)
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.

      threadPool.promoteFollower();

      LocalException localEx = null;

      IceInternal.IntMap requests = null;
      IceInternal.IntMap asyncRequests = null;

      synchronized(this)
      {
          --_finishedCount;
          if(_finishedCount == 0 && _state == StateClosed)
          {
            //
            // We must make sure that nobody is sending when we
            // close the transceiver.
            //
            synchronized(_sendMutex)
            {
                try
                {
                  _transceiver.close();
                }
                catch(LocalException ex)
                {
                  localEx = ex;
                }
                
                _transceiver = null;
                notifyAll();
            }
          }

          if(_state == StateClosed || _state == StateClosing)
          {
            requests = _requests;
            _requests = new IceInternal.IntMap();

            asyncRequests = _asyncRequests;
            _asyncRequests = new IceInternal.IntMap();
          }
      }

      if(requests != null)
      {
          java.util.Iterator i = requests.entryIterator();
          while(i.hasNext())
          {
            IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
            IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue();
            out.finished(_exception); // The exception is immutable at this point.
          }
      }
      
      if(asyncRequests != null)
      {
          java.util.Iterator i = asyncRequests.entryIterator();
          while(i.hasNext())
          {
            IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
            IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue();
            out.__finished(_exception); // The exception is immutable at this point.
          }
      }

      if(localEx != null)
      {
          throw localEx;
      }
    }

    public synchronized void
    exception(LocalException ex)
    {
      setState(StateClosed, ex);
    }

    public String
    type()
    {
      return _type; // No mutex lock, _type is immutable.
    }

    public int
    timeout()
    {
      return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable.
    }

    public String
    toString()
    {
      return _toString();
    }

    public String
    _toString()
    {
      return _desc; // No mutex lock, _desc is immutable.
    }

    //
    // Only used by the SSL plug-in.
    //
    // The external party has to synchronize the connection, since the
    // connection is the object that protects the transceiver.
    //
    public IceInternal.Transceiver
    getTransceiver()
    {
      return _transceiver;
    }

    public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver, 
                   IceInternal.EndpointI endpoint, ObjectAdapter adapter)
    {
        super(instance);
        _transceiver = transceiver;
      _desc = transceiver.toString();
        _type = transceiver.type();
        _endpoint = endpoint;
        _adapter = adapter;
        _logger = instance.initializationData().logger; // Cached for better performance.
        _traceLevels = instance.traceLevels(); // Cached for better performance.
      _registeredWithPool = false;
      _finishedCount = 0;
      _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
      _acmAbsoluteTimeoutMillis = 0;
        _nextRequestId = 1;
        _batchStream = new IceInternal.BasicStream(instance);
      _batchStreamInUse = false;
      _batchRequestNum = 0;
      _batchRequestCompress = false;
        _dispatchCount = 0;
        _state = StateNotValidated;
      _stateTime = System.currentTimeMillis();

      if(_endpoint.datagram())
      {
          _acmTimeout = 0;
      }
      else
      {
          if(_adapter != null)
          {
            _acmTimeout = _instance.serverACM();
          }
          else
          {
            _acmTimeout = _instance.clientACM();
          }
      }

      int compressionLevel =
          _instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
      if(compressionLevel < 1)
      {
          compressionLevel = 1;
      }
      else if(compressionLevel > 9)
      {
          compressionLevel = 9;
      }
      _compressionLevel = compressionLevel;

      if(_adapter != null)
      {
          _servantManager = ((ObjectAdapterI)_adapter).getServantManager();
      }
      else
      {
          _servantManager = null;
      }

      try
      {
          if(!_instance.threadPerConnection())
          {
            //
            // Only set _threadPool if we really need it, i.e., if we are
            // not in thread per connection mode. Thread pools have lazy
            // initialization in Instance, and we don't want them to be
            // created if they are not needed.
            //
            if(_adapter != null)
            {
                _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
            }
            else
            {
                _threadPool = _instance.clientThreadPool();
            }
          }
          else
          {
            _threadPool = null;
            
            //
            // If we are in thread per connection mode, create the thread
            // for this connection.
            //
            _threadPerConnection = new ThreadPerConnection();
            _threadPerConnection.start();
          }
      }
      catch(java.lang.Exception ex)
      {
          java.io.StringWriter sw = new java.io.StringWriter();
          java.io.PrintWriter pw = new java.io.PrintWriter(sw);
          ex.printStackTrace(pw);
          pw.flush();
          String s;
          if(_instance.threadPerConnection())
          {
            s = "cannot create thread for connection:\n";
          }
          else
          {
            s = "cannot create thread pool for connection:\n";
          }
          s += sw.toString();
          _logger.error(s);
          
          try
          {
            _transceiver.close();
          }
          catch(LocalException e)
          {
            // Here we ignore any exceptions in close().
          }
          
          Ice.SyscallException e = new Ice.SyscallException();
          e.initCause(ex);
          throw e;
      }

      _overrideCompress = _instance.defaultsAndOverrides().overrideCompress;
      _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue;
    }

    protected synchronized void
    finalize()
        throws Throwable
    {
      IceUtil.Assert.FinalizerAssert(_state == StateClosed);
      IceUtil.Assert.FinalizerAssert(_transceiver == null);
      IceUtil.Assert.FinalizerAssert(_dispatchCount == 0);
      IceUtil.Assert.FinalizerAssert(_threadPerConnection == null);

        super.finalize();
    }

    private static final int StateNotValidated = 0;
    private static final int StateActive = 1;
    private static final int StateHolding = 2;
    private static final int StateClosing = 3;
    private static final int StateClosed = 4;

    private void
    setState(int state, LocalException ex)
    {
      //
      // If setState() is called with an exception, then only closed
      // and closing states are permissible.
      //
      assert(state == StateClosing || state == StateClosed);

        if(_state == state) // Don't switch twice.
        {
            return;
        }

        if(_exception == null)
        {
          _exception = ex;

            if(_warn)
            {
            //
            // We don't warn if we are not validated.
            //
            if(_state > StateNotValidated)
            {
                //
                // Don't warn about certain expected exceptions.
                //
                if(!(_exception instanceof CloseConnectionException ||
                   _exception instanceof ForcedCloseConnectionException ||
                   _exception instanceof ConnectionTimeoutException ||
                   _exception instanceof CommunicatorDestroyedException ||
                   _exception instanceof ObjectAdapterDeactivatedException ||
                   (_exception instanceof ConnectionLostException && _state == StateClosing)))
                {
                  warning("connection exception", _exception);
                }
            }
            }
        }

      //
      // We must set the new state before we notify requests of any
      // exceptions. Otherwise new requests may retry on a
      // connection that is not yet marked as closed or closing.
      //
        setState(state);
    }

    private void
    setState(int state)
    {
        //
        // We don't want to send close connection messages if the endpoint
        // only supports oneway transmission from client to server.
        //
        if(_endpoint.datagram() && state == StateClosing)
        {
            state = StateClosed;
        }

      //
      // Skip graceful shutdown if we are destroyed before validation.
      //
      if(_state == StateNotValidated && state == StateClosing)
      {
          state = StateClosed;
      }

        if(_state == state) // Don't switch twice.
        {
            return;
        }

        switch(state)
        {
          case StateNotValidated:
          {
            assert(false);
            break;
          }

            case StateActive:
            {
            //
            // Can only switch from holding or not validated to
            // active.
            //
                if(_state != StateHolding && _state != StateNotValidated)
                {
                    return;
                }
            if(!_instance.threadPerConnection())
            {
                registerWithPool();
            }
                break;
            }

            case StateHolding:
            {
            //
            // Can only switch from active or not validated to
            // holding.
            //
            if(_state != StateActive && _state != StateNotValidated)
            {
                    return;
                }
            if(!_instance.threadPerConnection())
            {
                unregisterWithPool();
            }
                break;
            }

            case StateClosing:
            {
            //
            // Can't change back from closed.
            //
                if(_state == StateClosed)
                {
                    return;
                }
            if(!_instance.threadPerConnection())
            {
                registerWithPool(); // We need to continue to read in closing state.
            }
                break;
            }
          
            case StateClosed:
            {
            if(_instance.threadPerConnection())
            {
                //
                // If we are in thread per connection mode, we
                // shutdown both for reading and writing. This will
                // unblock and read call with an exception. The thread
                // per connection then closes the transceiver.
                //
                _transceiver.shutdownReadWrite();
            }
            else if(_state == StateNotValidated)
            {
                //
                // If we change from not validated, we can close right
                // away.
                //
                assert(!_registeredWithPool);

                //
                // We must make sure that nobody is sending when we
                // close the transceiver.
                //
                synchronized(_sendMutex)
                {
                  try
                  {
                      _transceiver.close();
                  }
                  catch(LocalException ex)
                  {
                      // Here we ignore any exceptions in close().
                  }

                  _transceiver = null;
                  //notifyAll(); // We notify already below.
                }
            }
            else
            {
                //
                // Otherwise we first must make sure that we are
                // registered, then we unregister, and let finished()
                // do the close.
                //
                registerWithPool();
                unregisterWithPool();

                //
                // We must prevent any further writes when _state == StateClosed.
                // However, functions such as sendResponse cannot acquire the main
                // mutex in order to check _state. Therefore we shut down the write
                // end of the transceiver, which causes subsequent write attempts
                // to fail with an exception.
                //
                _transceiver.shutdownWrite();
            }
            break;
            }
        }

      //  
      // We only register with the connection monitor if our new state
      // is StateActive. Otherwise we unregister with the connection
      // monitor, but only if we were registered before, i.e., if our
      // old state was StateActive.
      //
      IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
      if(connectionMonitor != null)
      {
          if(state == StateActive)
          {
            connectionMonitor.add(this);
          }
          else if(_state == StateActive)
          {
            connectionMonitor.remove(this);
          }
      }

        _state = state;
      _stateTime = System.currentTimeMillis();

      notifyAll();

        if(_state == StateClosing && _dispatchCount == 0)
        {
            try
            {
                initiateShutdown();
            }
            catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
            {
                setState(StateClosed, ex.get());
            }
            catch(LocalException ex)
            {
                setState(StateClosed, ex);
            }
        }
    }

    private void
    initiateShutdown()
      throws IceInternal.LocalExceptionWrapper // Java-specific workaround in Transceiver.write().
    {
      assert(_state == StateClosing);
      assert(_dispatchCount == 0);

      if(!_endpoint.datagram())
      {
          synchronized(_sendMutex)
          {
            //
            // Before we shut down, we send a close connection
            // message.
            //
            IceInternal.BasicStream os = new IceInternal.BasicStream(_instance);
            os.writeBlob(IceInternal.Protocol.magic);
            os.writeByte(IceInternal.Protocol.protocolMajor);
            os.writeByte(IceInternal.Protocol.protocolMinor);
            os.writeByte(IceInternal.Protocol.encodingMajor);
            os.writeByte(IceInternal.Protocol.encodingMinor);
            os.writeByte(IceInternal.Protocol.closeConnectionMsg);
            os.writeByte(_compressionSupported ? (byte)1 : (byte)0);
            os.writeInt(IceInternal.Protocol.headerSize); // Message size.
            
            //
            // Send the message.
            //
            IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
            _transceiver.write(os, _endpoint.timeout());
            //
            // The CloseConnection message should be sufficient. Closing the write
            // end of the socket is probably an artifact of how things were done
            // in IIOP. In fact, shutting down the write end of the socket causes
            // problems on Windows by preventing the peer from using the socket.
            // For example, the peer is no longer able to continue writing a large
            // message after the socket is shutdown.
            //
            //_transceiver.shutdownWrite();
          }
      }
    }

    private void
    registerWithPool()
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.

      if(!_registeredWithPool)
      {
          _threadPool._register(_transceiver.fd(), this);
          _registeredWithPool = true;
      }
    }

    private void
    unregisterWithPool()
    {
      assert(!_instance.threadPerConnection()); // Only for use with a thread pool.

      if(_registeredWithPool)
      {
          _threadPool.unregister(_transceiver.fd());
          _registeredWithPool = false;    
          ++_finishedCount; // For each unregistration, finished() is called once.
      }
    }

    private IceInternal.BasicStream
    doCompress(IceInternal.BasicStream uncompressed, boolean compress)
    {
      if(_compressionSupported)
      {
          if(compress && uncompressed.size() >= 100)
          {
            //
            // Do compression.
            //
            IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize,
                                                      _compressionLevel);
            if(cstream != null)
            {
                //
                // Set compression status.
                //
                cstream.pos(9);
                cstream.writeByte((byte)2);

                //
                // Write the size of the compressed stream into the header.
                //
                cstream.pos(10);
                cstream.writeInt(cstream.size());

                //
                // Write the compression status and size of the compressed stream into the header of the
                // uncompressed stream -- we need this to trace requests correctly.
                //
                uncompressed.pos(9);
                uncompressed.writeByte((byte)2);
                uncompressed.writeInt(cstream.size());

                return cstream;
            }
          }
      }

      uncompressed.pos(9);
      uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0));

      //
      // Not compressed, fill in the message size.
      //
      uncompressed.pos(10);
      uncompressed.writeInt(uncompressed.size());

      return uncompressed;
    }

    private static class MessageInfo
    {
      MessageInfo(IceInternal.BasicStream stream)
      {
          this.stream = stream;
      }

      IceInternal.BasicStream stream;
      int invokeNum;
      int requestId;
      byte compress;
      IceInternal.ServantManager servantManager;
      ObjectAdapter adapter;
      IceInternal.OutgoingAsync outAsync;
    }

    private void
    parseMessage(MessageInfo info)
    {
      assert(_state > StateNotValidated && _state < StateClosed);

      if(_acmTimeout > 0)
      {
          _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
      }

      try
      {
          //
          // We don't need to check magic and version here. This has
          // already been done by the ThreadPool or ThreadPerConnection,
          // which provides us with the stream.
          //
          assert(info.stream.pos() == info.stream.size());
          info.stream.pos(8);
          byte messageType = info.stream.readByte();
          info.compress = info.stream.readByte();
          if(info.compress == (byte)2)
          {
            if(_compressionSupported)
            {
                IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize);
                if(ustream != info.stream)
                {
                  info.stream = ustream;
                }
            }
            else
            {
                FeatureNotSupportedException ex = new FeatureNotSupportedException();
                ex.unsupportedFeature = "Cannot uncompress compressed message: "
                  + "org.apache.tools.bzip2.CBZip2OutputStream was not found";
                throw ex;
            }
          }
          info.stream.pos(IceInternal.Protocol.headerSize);

          switch(messageType)
          {
            case IceInternal.Protocol.closeConnectionMsg:
            {
                IceInternal.TraceUtil.traceHeader("received close connection", info.stream, _logger, _traceLevels);
                if(_endpoint.datagram())
                {
                    if(_warn)
                    {
                      _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
                    }
                }
                else
                {
                  setState(StateClosed, new CloseConnectionException());
                }
                break;
            }

            case IceInternal.Protocol.requestMsg:
            {
                if(_state == StateClosing)
                {
                  IceInternal.TraceUtil.traceRequest("received request during closing\n" +
                                             "(ignored by server, client will retry)",
                                             info.stream, _logger, _traceLevels);
                }
                else
                {
                  IceInternal.TraceUtil.traceRequest("received request", info.stream, _logger, _traceLevels);
                  info.requestId = info.stream.readInt();
                  info.invokeNum = 1;
                  info.servantManager = _servantManager;
                  info.adapter = _adapter;
                  ++_dispatchCount;
                }
                break;
            }

            case IceInternal.Protocol.requestBatchMsg:
            {
                if(_state == StateClosing)
                {
                  IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" +
                                                "(ignored by server, client will retry)",
                                                info.stream, _logger, _traceLevels);
                }
                else
                {
                  IceInternal.TraceUtil.traceBatchRequest("received batch request", info.stream, _logger,
                                                _traceLevels);
                  info.invokeNum = info.stream.readInt();
                  if(info.invokeNum < 0)
                  {
                      info.invokeNum = 0;
                      throw new NegativeSizeException();
                  }
                  info.servantManager = _servantManager;
                  info.adapter = _adapter;
                  _dispatchCount += info.invokeNum;
                }
                break;
            }

            case IceInternal.Protocol.replyMsg:
            {
                IceInternal.TraceUtil.traceReply("received reply", info.stream, _logger, _traceLevels);
                info.requestId = info.stream.readInt();
                IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId);
                if(out != null)
                {
                  out.finished(info.stream);
                }
                else
                {
                  info.outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(info.requestId);
                  if(info.outAsync == null)
                  {
                      throw new UnknownRequestIdException();
                  }
                }
                break;
            }

            case IceInternal.Protocol.validateConnectionMsg:
            {
                IceInternal.TraceUtil.traceHeader("received validate connection", info.stream, _logger,
                                          _traceLevels);
                if(_warn)
                {
                  _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
                }
                break;
            }

            default:
            {
                IceInternal.TraceUtil.traceHeader("received unknown message\n" +
                                          "(invalid, closing connection)", info.stream, _logger,
                                          _traceLevels);
                throw new UnknownMessageException();
            }
          }
      }
      catch(SocketException ex)
      {
          setState(StateClosed, ex);
      }
      catch(LocalException ex)
      {
          if(_endpoint.datagram())
          {
              if(_warn)
            {
                _logger.warning("udp connection exception:\n" + ex + _desc);
            }
          }
          else
          {
              setState(StateClosed, ex);
          }
      }
    }

    private void
    invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress,
            IceInternal.ServantManager servantManager, ObjectAdapter adapter)
    {
      //
      // Note: In contrast to other private or protected methods, this
      // operation must be called *without* the mutex locked.
      //

      IceInternal.Incoming in = null;
      try
      {
          while(invokeNum > 0)
          {
            
            //
            // Prepare the invocation.
            //
            boolean response = !_endpoint.datagram() && requestId != 0;
            in = getIncoming(adapter, response, compress, requestId);
            IceInternal.BasicStream is = in.is();
            stream.swap(is);
            IceInternal.BasicStream os = in.os();
            
            //
            // Prepare the response if necessary.
            //
            if(response)
            {
                assert(invokeNum == 1); // No further invocations if a response is expected.
                os.writeBlob(IceInternal.Protocol.replyHdr);
                
                //
                // Add the request ID.
                //
                os.writeInt(requestId);
            }
            
            in.invoke(servantManager);
            
            //
            // If there are more invocations, we need the stream back.
            //
            if(--invokeNum > 0)
            {
                stream.swap(is);
                }

            reclaimIncoming(in);
            in = null;
          }
      }
      catch(LocalException ex)
      {
          synchronized(this)
          {
            setState(StateClosed, ex);
          }
      }
      catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
      {
          synchronized(this)
          {
            UnknownException uex = new UnknownException();
            java.io.StringWriter sw = new java.io.StringWriter();
            java.io.PrintWriter pw = new java.io.PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();
            uex.unknown = sw.toString();
            _logger.error(uex.unknown);
            setState(StateClosed, uex);
          }
      }
      catch(java.lang.Exception ex)
      {
          synchronized(this)
          {
            UnknownException uex = new UnknownException();
            java.io.StringWriter sw = new java.io.StringWriter();
            java.io.PrintWriter pw = new java.io.PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();
            uex.unknown = sw.toString();
            setState(StateClosed, uex);
          }
      }
      finally
      {
          if(in != null)
          {
            reclaimIncoming(in);
          }
      }
            
      //
      // If invoke() above raised an exception, and therefore
      // neither sendResponse() nor sendNoResponse() has been
      // called, then we must decrement _dispatchCount here.
      //
      if(invokeNum > 0)
      {
          synchronized(this)
          {
            assert(_dispatchCount > 0);
            _dispatchCount -= invokeNum;
            assert(_dispatchCount >= 0);
            if(_dispatchCount == 0)
            {
                notifyAll();
            }
          }
      }
    }

    private void
    run()
    {
      //
      // For non-datagram connections, the thread-per-connection
      // must validate and activate this connection, and not in the
      // connection factory. Please see the comments in the
      // connection factory for details.
      //
      if(!_endpoint.datagram())
      {
          try
          {
            validate();
          }
          catch(LocalException ex)
          {
            synchronized(this)
            {
                assert(_state == StateClosed);
                
                //
                // We must make sure that nobody is sending when
                // we close the transceiver.
                //
                synchronized(_sendMutex)
                {
                    if(_transceiver != null)
                  {
                      try
                      {
                          _transceiver.close();
                      }
                      catch(LocalException e)
                      {
                          // Here we ignore any exceptions in close().
                      }
                  
                      _transceiver = null;
                  }
                  notifyAll();
                }
            }
            return;
          }
          
          activate();
      }

      boolean warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;

      boolean closed = false;

      IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance);

      while(!closed)
      {
          //
          // We must accept new connections outside the thread
          // synchronization, because we use blocking accept.
          //

          try
          {
            try
            {
                stream.resize(IceInternal.Protocol.headerSize, true);
                stream.pos(0);
                _transceiver.read(stream, -1);

                int pos = stream.pos();
                if(pos < IceInternal.Protocol.headerSize)
                {
                  //
                  // This situation is possible for small UDP packets.
                  //
                  throw new IllegalMessageSizeException();
                }
                stream.pos(0);
                byte[] m = stream.readBlob(4);
                if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
                   m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
                {
                  BadMagicException ex = new BadMagicException();
                  ex.badMagic = m;
                  throw ex;
                }
                byte pMajor = stream.readByte();
                byte pMinor = stream.readByte();
                if(pMajor != IceInternal.Protocol.protocolMajor)
                {
                  UnsupportedProtocolException e = new UnsupportedProtocolException();
                  e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
                  e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
                  e.major = IceInternal.Protocol.protocolMajor;
                  e.minor = IceInternal.Protocol.protocolMinor;
                  throw e;
                }
                byte eMajor = stream.readByte();
                byte eMinor = stream.readByte();
                if(eMajor != IceInternal.Protocol.encodingMajor)
                {
                  UnsupportedEncodingException e = new UnsupportedEncodingException();
                  e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
                  e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
                  e.major = IceInternal.Protocol.encodingMajor;
                  e.minor = IceInternal.Protocol.encodingMinor;
                  throw e;
                }
                byte messageType = stream.readByte();
                byte compress = stream.readByte();
                int size = stream.readInt();
                if(size < IceInternal.Protocol.headerSize)
                {
                  throw new IllegalMessageSizeException();
                }
                if(size > _instance.messageSizeMax())
                {
                  throw new MemoryLimitException();
                }
                if(size > stream.size())
                {
                  stream.resize(size, true);
                }
                stream.pos(pos);

                if(pos != stream.size())
                {
                  if(_endpoint.datagram())
                  {
                      if(warnUdp)
                      {
                        _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded");
                      }
                      throw new DatagramLimitException();
                  }
                  else
                  {
                      _transceiver.read(stream, -1);
                      assert(stream.pos() == stream.size());
                  }
                }
            }
            catch(DatagramLimitException ex) // Expected.
            {
                continue;
            }
            catch(SocketException ex)
            {
                exception(ex);
            }
            catch(LocalException ex)
            {
                  if(_endpoint.datagram())
                  {
                      if(_warn)
                    {
                        _logger.warning("datagram connection exception:\n" + ex + "\n" + _desc);
                    }
                  continue;
                  }
                  else
                  {
                    exception(ex);
                }
            }

            MessageInfo info = new MessageInfo(stream);

            LocalException localEx = null;

            IceInternal.IntMap requests = null;
            IceInternal.IntMap asyncRequests = null;

            synchronized(this)
            {
                while(_state == StateHolding)
                {
                  try
                  {
                      wait();
                  }
                  catch(InterruptedException ex)
                  {
                  }
                }
            
                if(_state != StateClosed)
                {
                  parseMessage(info);
                }

                //
                // parseMessage() can close the connection, so we must
                // check for closed state again.
                //
                if(_state == StateClosed)
                {
                  //
                  // We must make sure that nobody is sending when we close
                  // the transceiver.
                  //
                  synchronized(_sendMutex)
                  {
                      try
                      {
                        _transceiver.close();
                      }
                      catch(LocalException ex)
                      {
                        localEx = ex;
                      }
                      
                      _transceiver = null;
                      notifyAll();
                  }

                  //
                  // We cannot simply return here. We have to make sure
                  // that all requests (regular and async) are notified
                  // about the closed connection below.
                  //
                  closed = true;
                }

                if(_state == StateClosed || _state == StateClosing)
                {
                  requests = _requests;
                  _requests = new IceInternal.IntMap();

                  asyncRequests = _asyncRequests;
                  _asyncRequests = new IceInternal.IntMap();
                }
            }

            //
            // Asynchronous replies must be handled outside the thread
            // synchronization, so that nested calls are possible.
            //
            if(info.outAsync != null)
            {
                info.outAsync.__finished(info.stream);
            }
            
            //
            // Method invocation (or multiple invocations for batch messages)
            // must be done outside the thread synchronization, so that nested
            // calls are possible.
            //
            invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
                    info.adapter);

            if(requests != null)
            {
                java.util.Iterator i = requests.entryIterator();
                while(i.hasNext())
                {
                  IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
                  IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue();
                  out.finished(_exception); // The exception is immutable at this point.
                }
            }

            if(asyncRequests != null)
            {
                java.util.Iterator i = asyncRequests.entryIterator();
                while(i.hasNext())
                {
                  IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
                  IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue();
                  out.__finished(_exception); // The exception is immutable at this point.
                }
            }

            if(localEx != null)
            {
                assert(closed);
                throw localEx;
            }    
          }
          finally
          {
            stream.reset();
          }
      }
    }

    private void
    warning(String msg, Exception ex)
    {
        java.io.StringWriter sw = new java.io.StringWriter();
        java.io.PrintWriter pw = new java.io.PrintWriter(sw);
      ex.printStackTrace(pw);
        pw.flush();
        String s = msg + ":\n" + _desc + "\n" + sw.toString();
        _logger.warning(s);
    }

    private void
    error(String msg, Exception ex)
    {
        java.io.StringWriter sw = new java.io.StringWriter();
        java.io.PrintWriter pw = new java.io.PrintWriter(sw);
      ex.printStackTrace(pw);
        pw.flush();
        String s = msg + ":\n" + _desc + "\n" + sw.toString();
        _logger.error(s);
    }

    private IceInternal.Incoming
    getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
    {
        IceInternal.Incoming in = null;

        synchronized(_incomingCacheMutex)
        {
            if(_incomingCache == null)
            {
                in = new IceInternal.Incoming(_instance, this, adapter, response, compress, requestId);
            }
            else
            {
                in = _incomingCache;
                _incomingCache = _incomingCache.next;
            in.reset(_instance, this, adapter, response, compress, requestId);
                in.next = null;
            }
        }

        return in;
    }

    private void
    reclaimIncoming(IceInternal.Incoming in)
    {
        synchronized(_incomingCacheMutex)
        {
          in.next = _incomingCache;
          _incomingCache = in;
          //
          // Clear references to Ice objects as soon as possible.
          //
          _incomingCache.reclaim();
        }
    }

    public IceInternal.Outgoing
    getOutgoing(IceInternal.Reference reference, String operation, OperationMode mode, java.util.Map context,
            boolean compress)
      throws IceInternal.LocalExceptionWrapper
    {
      IceInternal.Outgoing out = null;

      synchronized(_outgoingCacheMutex)
      {
          if(_outgoingCache == null)
          {
            out = new IceInternal.Outgoing(this, reference, operation, mode, context, compress);
          }
          else
          {
            out = _outgoingCache;
            _outgoingCache = _outgoingCache.next;
            out.reset(reference, operation, mode, context, compress);
            out.next = null;
          }
      }

      return out;
    }

    public void
    reclaimOutgoing(IceInternal.Outgoing out)
    {
      //
      // Clear references to Ice objects as soon as possible.
      //
      out.reclaim();

      synchronized(_outgoingCacheMutex)
      {
          out.next = _outgoingCache;
          _outgoingCache = out;
      }
    }

    private class ThreadPerConnection extends Thread
    {
      public void
      run()
      {
          if(ConnectionI.this._instance.initializationData().threadHook != null)
          {
              ConnectionI.this._instance.initializationData().threadHook.start();
          }
          
          try
          {
            ConnectionI.this.run();
          }
          catch(Exception ex)
          {
            ConnectionI.this.error("exception in thread per connection", ex);
          }
          finally
          {
              if(ConnectionI.this._instance.initializationData().threadHook != null)
              {
                  ConnectionI.this._instance.initializationData().threadHook.stop();
              }
          }
      }
    }
    private Thread _threadPerConnection;

    private IceInternal.Transceiver _transceiver;
    private final String _desc;
    private final String _type;
    private final IceInternal.EndpointI _endpoint;

    private ObjectAdapter _adapter;
    private IceInternal.ServantManager _servantManager;

    private final Logger _logger;
    private final IceInternal.TraceLevels _traceLevels;

    private boolean _registeredWithPool;
    private int _finishedCount;
    private final IceInternal.ThreadPool _threadPool;

    private final boolean _warn;

    private final int _acmTimeout;
    private long _acmAbsoluteTimeoutMillis;

    private final int _compressionLevel;

    private int _nextRequestId;

    private IceInternal.IntMap _requests = new IceInternal.IntMap();
    private IceInternal.IntMap _asyncRequests = new IceInternal.IntMap();

    private LocalException _exception;

    private IceInternal.BasicStream _batchStream;
    private boolean _batchStreamInUse;
    private int _batchRequestNum;
    private boolean _batchRequestCompress;

    private int _dispatchCount;

    private int _state; // The current state.
    private long _stateTime; // The last time when the state was changed.

    //
    // We have a separate mutex for sending, so that we don't block
    // the whole connection when we do a blocking send.
    //
    private java.lang.Object _sendMutex = new java.lang.Object();

    private IceInternal.Incoming _incomingCache;
    private java.lang.Object _incomingCacheMutex = new java.lang.Object();

    private IceInternal.Outgoing _outgoingCache;
    private java.lang.Object _outgoingCacheMutex = new java.lang.Object();

    private static boolean _compressionSupported = IceInternal.BasicStream.compressible();

    private boolean _overrideCompress;
    private boolean _overrideCompressValue;
}

Generated by  Doxygen 1.6.0   Back to index